This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9236eb9  [FLINK-10690][tests] Fix Files.list resource leaks
9236eb9 is described below

commit 9236eb9adb3597aec5f17d44acfd68718c6cd40d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Oct 31 11:16:37 2018 +0100

    [FLINK-10690][tests] Fix Files.list resource leaks
---
 .../java/org/apache/flink/util/FileUtilsTest.java  | 19 ++++++++++++------
 .../tests/DistributedCacheViaBlobTestProgram.java  | 13 ++++++++----
 .../fs/s3/common/utils/RefCountedFileTest.java     | 14 ++++++++++---
 .../fs/s3hadoop/HadoopS3RecoverableWriterTest.java |  4 +++-
 .../runtime/rest/MultipartUploadResource.java      | 21 +++++++++++---------
 .../jobmaster/JobMasterTriggerSavepointIT.java     | 11 +++++++++--
 .../ResumeCheckpointManuallyITCase.java            | 23 ++++++++++++----------
 .../test/java/org/apache/flink/yarn/UtilsTest.java | 13 +++++++++---
 8 files changed, 80 insertions(+), 38 deletions(-)

diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 23878cb..76b7805 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -38,6 +38,7 @@ import java.nio.file.Paths;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -227,12 +228,18 @@ public class FileUtilsTest extends TestLogger {
                assertEquals(expected.getFileName(), actual.getFileName());
 
                if (Files.isDirectory(expected)) {
-                       List<java.nio.file.Path> expectedContents = 
Files.list(expected)
-                               
.sorted(Comparator.comparing(java.nio.file.Path::toString))
-                               .collect(Collectors.toList());
-                       List<java.nio.file.Path> actualContents = 
Files.list(actual)
-                               
.sorted(Comparator.comparing(java.nio.file.Path::toString))
-                               .collect(Collectors.toList());
+                       List<java.nio.file.Path> expectedContents;
+                       try (Stream<java.nio.file.Path> files = 
Files.list(expected)) {
+                               expectedContents = files
+                                       
.sorted(Comparator.comparing(java.nio.file.Path::toString))
+                                       .collect(Collectors.toList());
+                       }
+                       List<java.nio.file.Path> actualContents;
+                       try (Stream<java.nio.file.Path> files = 
Files.list(actual)) {
+                               actualContents = files
+                                       
.sorted(Comparator.comparing(java.nio.file.Path::toString))
+                                       .collect(Collectors.toList());
+                       }
 
                        assertEquals(expectedContents.size(), 
actualContents.size());
 
diff --git 
a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
 
b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
index 167101c..388cdc6 100644
--- 
a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * End-to-end test program for verifying that files are distributed via 
BlobServer and later accessible through
@@ -47,8 +48,10 @@ public class DistributedCacheViaBlobTestProgram {
                env.registerCachedFile(inputFile.toString(), "test_data", 
false);
                env.registerCachedFile(inputDir.toString(), "test_dir", false);
 
-               Path containedFile = Files.list(inputDir).findAny()
-                       .orElseThrow(() -> new RuntimeException("Input 
directory must not be empty."));
+               final Path containedFile;
+               try (Stream<Path> files = Files.list(inputDir)) {
+                       containedFile = files.findAny().orElseThrow(() -> new 
RuntimeException("Input directory must not be empty."));
+               }
 
                env.fromElements(1)
                        .map(new TestMapFunction(
@@ -96,8 +99,10 @@ public class DistributedCacheViaBlobTestProgram {
                                        "initial dir. Input dir path: %s. Cache 
dir path: %s", initialDirPath, testDir));
                        }
 
-                       if 
(Files.list(testDir).map(Path::getFileName).map(Path::toString).noneMatch(path 
-> path.equals(containedFileName))) {
-                               throw new 
RuntimeException(String.format("Cached directory %s should not be empty.", 
testDir));
+                       try (Stream<Path> files = Files.list(testDir)) {
+                               if 
(files.map(Path::getFileName).map(Path::toString).noneMatch(path -> 
path.equals(containedFileName))) {
+                                       throw new 
RuntimeException(String.format("Cached directory %s should not be empty.", 
testDir));
+                               }
                        }
 
                        return Files.readAllLines(testFile)
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
index 1042683..2e03197 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -28,8 +28,10 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 /**
  * Tests for the {@link RefCountedFile}.
@@ -49,7 +51,9 @@ public class RefCountedFileTest {
 
                fileUnderTest.release();
 
-               Assert.assertEquals(0L, 
Files.list(temporaryFolder.getRoot().toPath()).count());
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       Assert.assertEquals(0L, files.count());
+               }
        }
 
        @Test
@@ -76,7 +80,9 @@ public class RefCountedFileTest {
 
                fileUnderTest.release();
                // the file is deleted now
-               Assert.assertEquals(0L, 
Files.list(temporaryFolder.getRoot().toPath()).count());
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       Assert.assertEquals(0L, files.count());
+               }
        }
 
        @Test
@@ -111,7 +117,9 @@ public class RefCountedFileTest {
        // ------------------------------------- Utilities 
-------------------------------------
 
        private void verifyTheFileIsStillThere() throws IOException {
-               Assert.assertEquals(1L, 
Files.list(temporaryFolder.getRoot().toPath()).count());
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       Assert.assertEquals(1L, files.count());
+               }
        }
 
        private RefCountedFile getClosedRefCountedFileWithContent(String 
content) throws IOException {
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
index 0f46f58..4a1368a 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
@@ -164,7 +164,9 @@ public class HadoopS3RecoverableWriterTest extends 
TestLogger {
 
                // delete local tmp dir.
                Assert.assertTrue(Files.exists(localTmpDir));
-               Assert.assertEquals(0L, Files.list(localTmpDir).count());
+               try (Stream<java.nio.file.Path> files = 
Files.list(localTmpDir)) {
+                       Assert.assertEquals(0L, files.count());
+               }
                Files.delete(localTmpDir);
 
                // delete also S3 dir.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c350393..22de8a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -63,10 +63,10 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 import static org.junit.Assert.assertArrayEquals;
@@ -182,14 +182,17 @@ public class MultipartUploadResource extends 
ExternalResource {
        }
 
        public void assertUploadDirectoryIsEmpty() throws IOException {
-               Preconditions.checkArgument(
-                       1 == Files.list(configuredUploadDir).count(),
-                       "Directory structure in rest upload directory has 
changed. Test must be adjusted");
-               Optional<Path> actualUploadDir = 
Files.list(configuredUploadDir).findAny();
-               Preconditions.checkArgument(
-                       actualUploadDir.isPresent(),
-                       "Expected upload directory does not exist.");
-               assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
+               Path actualUploadDir;
+               try (Stream<Path> containedFiles = 
Files.list(configuredUploadDir)) {
+                       List<Path> files = 
containedFiles.collect(Collectors.toList());
+                       Preconditions.checkArgument(
+                               1 == files.size(),
+                               "Directory structure in rest upload directory 
has changed. Test must be adjusted");
+                       actualUploadDir = files.get(0);
+               }
+               try (Stream<Path> containedFiles = Files.list(actualUploadDir)) 
{
+                       assertEquals("Not all files were cleaned up.", 0, 
containedFiles.count());
+               }
        }
 
        /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 95c357d..5cbde5d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -122,7 +123,10 @@ public class JobMasterTriggerSavepointIT extends 
AbstractTestBase {
 
                assertThat(jobStatus, isOneOf(JobStatus.CANCELED, 
JobStatus.CANCELLING));
 
-               final List<Path> savepoints = 
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+               final List<Path> savepoints;
+               try (Stream<Path> savepointFiles = 
Files.list(savepointDirectory)) {
+                       savepoints = 
savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+               }
                assertThat(savepoints, 
hasItem(Paths.get(savepointLocation).getFileName()));
        }
 
@@ -136,7 +140,10 @@ public class JobMasterTriggerSavepointIT extends 
AbstractTestBase {
 
                assertThat(jobStatus, isOneOf(JobStatus.CANCELED, 
JobStatus.CANCELLING));
 
-               final List<Path> savepoints = 
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+               final List<Path> savepoints;
+               try (Stream<Path> savepointFiles = 
Files.list(savepointDirectory)) {
+                       savepoints = 
savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+               }
                assertThat(savepoints, 
hasItem(Paths.get(savepointLocation).getFileName()));
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 0635f23..4a08b7c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -54,6 +54,7 @@ import java.nio.file.Path;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -326,16 +327,18 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
        }
 
        private static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId) throws IOException {
-               return 
Files.list(checkpointDir.toPath().resolve(jobId.toString()))
-                       .filter(path -> 
path.getFileName().toString().startsWith("chk-"))
-                       .filter(path -> {
-                               try {
-                                       return Files.list(path).anyMatch(child 
-> child.getFileName().toString().contains("meta"));
-                               } catch (IOException ignored) {
-                                       return false;
-                               }
-                       })
-                       .findAny();
+               try (Stream<Path> checkpoints = 
Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
+                       return checkpoints
+                               .filter(path -> 
path.getFileName().toString().startsWith("chk-"))
+                               .filter(path -> {
+                                       try (Stream<Path> checkpointFiles = 
Files.list(path)) {
+                                               return 
checkpointFiles.anyMatch(child -> 
child.getFileName().toString().contains("meta"));
+                                       } catch (IOException ignored) {
+                                               return false;
+                                       }
+                               })
+                               .findAny();
+               }
        }
 
        private static void waitUntilCanceled(JobID jobId, ClusterClient<?> 
client) throws ExecutionException, InterruptedException {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 5fc3567..b272a21 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -25,6 +25,7 @@ import org.junit.rules.TemporaryFolder;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
+import java.util.stream.Stream;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -41,12 +42,18 @@ public class UtilsTest {
        public void testDeleteApplicationFiles() throws Exception {
                final Path applicationFilesDir = 
temporaryFolder.newFolder(".flink").toPath();
                Files.createFile(applicationFilesDir.resolve("flink.jar"));
-               
assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(1L));
-               assertThat(Files.list(applicationFilesDir).count(), 
equalTo(1L));
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       assertThat(files.count(), equalTo(1L));
+               }
+               try (Stream<Path> files = Files.list(applicationFilesDir)) {
+                       assertThat(files.count(), equalTo(1L));
+               }
 
                Utils.deleteApplicationFiles(Collections.singletonMap(
                        YarnConfigKeys.FLINK_YARN_FILES,
                        applicationFilesDir.toString()));
-               
assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(0L));
+               try (Stream<Path> files = 
Files.list(temporaryFolder.getRoot().toPath())) {
+                       assertThat(files.count(), equalTo(0L));
+               }
        }
 }

Reply via email to