This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9236eb9 [FLINK-10690][tests] Fix Files.list resource leaks
9236eb9 is described below
commit 9236eb9adb3597aec5f17d44acfd68718c6cd40d
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Oct 31 11:16:37 2018 +0100
[FLINK-10690][tests] Fix Files.list resource leaks
---
.../java/org/apache/flink/util/FileUtilsTest.java | 19 ++++++++++++------
.../tests/DistributedCacheViaBlobTestProgram.java | 13 ++++++++----
.../fs/s3/common/utils/RefCountedFileTest.java | 14 ++++++++++---
.../fs/s3hadoop/HadoopS3RecoverableWriterTest.java | 4 +++-
.../runtime/rest/MultipartUploadResource.java | 21 +++++++++++---------
.../jobmaster/JobMasterTriggerSavepointIT.java | 11 +++++++++--
.../ResumeCheckpointManuallyITCase.java | 23 ++++++++++++----------
.../test/java/org/apache/flink/yarn/UtilsTest.java | 13 +++++++++---
8 files changed, 80 insertions(+), 38 deletions(-)
diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
index 23878cb..76b7805 100644
--- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
@@ -38,6 +38,7 @@ import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -227,12 +228,18 @@ public class FileUtilsTest extends TestLogger {
assertEquals(expected.getFileName(), actual.getFileName());
if (Files.isDirectory(expected)) {
- List<java.nio.file.Path> expectedContents =
Files.list(expected)
-
.sorted(Comparator.comparing(java.nio.file.Path::toString))
- .collect(Collectors.toList());
- List<java.nio.file.Path> actualContents =
Files.list(actual)
-
.sorted(Comparator.comparing(java.nio.file.Path::toString))
- .collect(Collectors.toList());
+ List<java.nio.file.Path> expectedContents;
+ try (Stream<java.nio.file.Path> files =
Files.list(expected)) {
+ expectedContents = files
+
.sorted(Comparator.comparing(java.nio.file.Path::toString))
+ .collect(Collectors.toList());
+ }
+ List<java.nio.file.Path> actualContents;
+ try (Stream<java.nio.file.Path> files =
Files.list(actual)) {
+ actualContents = files
+
.sorted(Comparator.comparing(java.nio.file.Path::toString))
+ .collect(Collectors.toList());
+ }
assertEquals(expectedContents.size(),
actualContents.size());
diff --git
a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
index 167101c..388cdc6 100644
---
a/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
+++
b/flink-end-to-end-tests/flink-distributed-cache-via-blob-test/src/main/java/org/apache/flink/streaming/tests/DistributedCacheViaBlobTestProgram.java
@@ -26,6 +26,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* End-to-end test program for verifying that files are distributed via
BlobServer and later accessible through
@@ -47,8 +48,10 @@ public class DistributedCacheViaBlobTestProgram {
env.registerCachedFile(inputFile.toString(), "test_data",
false);
env.registerCachedFile(inputDir.toString(), "test_dir", false);
- Path containedFile = Files.list(inputDir).findAny()
- .orElseThrow(() -> new RuntimeException("Input
directory must not be empty."));
+ final Path containedFile;
+ try (Stream<Path> files = Files.list(inputDir)) {
+ containedFile = files.findAny().orElseThrow(() -> new
RuntimeException("Input directory must not be empty."));
+ }
env.fromElements(1)
.map(new TestMapFunction(
@@ -96,8 +99,10 @@ public class DistributedCacheViaBlobTestProgram {
"initial dir. Input dir path: %s. Cache
dir path: %s", initialDirPath, testDir));
}
- if
(Files.list(testDir).map(Path::getFileName).map(Path::toString).noneMatch(path
-> path.equals(containedFileName))) {
- throw new
RuntimeException(String.format("Cached directory %s should not be empty.",
testDir));
+ try (Stream<Path> files = Files.list(testDir)) {
+ if
(files.map(Path::getFileName).map(Path::toString).noneMatch(path ->
path.equals(containedFileName))) {
+ throw new
RuntimeException(String.format("Cached directory %s should not be empty.",
testDir));
+ }
}
return Files.readAllLines(testFile)
diff --git
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
index 1042683..2e03197 100644
---
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
+++
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedFileTest.java
@@ -28,8 +28,10 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.UUID;
+import java.util.stream.Stream;
/**
* Tests for the {@link RefCountedFile}.
@@ -49,7 +51,9 @@ public class RefCountedFileTest {
fileUnderTest.release();
- Assert.assertEquals(0L,
Files.list(temporaryFolder.getRoot().toPath()).count());
+ try (Stream<Path> files =
Files.list(temporaryFolder.getRoot().toPath())) {
+ Assert.assertEquals(0L, files.count());
+ }
}
@Test
@@ -76,7 +80,9 @@ public class RefCountedFileTest {
fileUnderTest.release();
// the file is deleted now
- Assert.assertEquals(0L,
Files.list(temporaryFolder.getRoot().toPath()).count());
+ try (Stream<Path> files =
Files.list(temporaryFolder.getRoot().toPath())) {
+ Assert.assertEquals(0L, files.count());
+ }
}
@Test
@@ -111,7 +117,9 @@ public class RefCountedFileTest {
// ------------------------------------- Utilities
-------------------------------------
private void verifyTheFileIsStillThere() throws IOException {
- Assert.assertEquals(1L,
Files.list(temporaryFolder.getRoot().toPath()).count());
+ try (Stream<Path> files =
Files.list(temporaryFolder.getRoot().toPath())) {
+ Assert.assertEquals(1L, files.count());
+ }
}
private RefCountedFile getClosedRefCountedFileWithContent(String
content) throws IOException {
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
index 0f46f58..4a1368a 100644
---
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterTest.java
@@ -164,7 +164,9 @@ public class HadoopS3RecoverableWriterTest extends
TestLogger {
// delete local tmp dir.
Assert.assertTrue(Files.exists(localTmpDir));
- Assert.assertEquals(0L, Files.list(localTmpDir).count());
+ try (Stream<java.nio.file.Path> files =
Files.list(localTmpDir)) {
+ Assert.assertEquals(0L, files.count());
+ }
Files.delete(localTmpDir);
// delete also S3 dir.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
index c350393..22de8a1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadResource.java
@@ -63,10 +63,10 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertArrayEquals;
@@ -182,14 +182,17 @@ public class MultipartUploadResource extends
ExternalResource {
}
public void assertUploadDirectoryIsEmpty() throws IOException {
- Preconditions.checkArgument(
- 1 == Files.list(configuredUploadDir).count(),
- "Directory structure in rest upload directory has
changed. Test must be adjusted");
- Optional<Path> actualUploadDir =
Files.list(configuredUploadDir).findAny();
- Preconditions.checkArgument(
- actualUploadDir.isPresent(),
- "Expected upload directory does not exist.");
- assertEquals("Not all files were cleaned up.", 0,
Files.list(actualUploadDir.get()).count());
+ Path actualUploadDir;
+ try (Stream<Path> containedFiles =
Files.list(configuredUploadDir)) {
+ List<Path> files =
containedFiles.collect(Collectors.toList());
+ Preconditions.checkArgument(
+ 1 == files.size(),
+ "Directory structure in rest upload directory
has changed. Test must be adjusted");
+ actualUploadDir = files.get(0);
+ }
+ try (Stream<Path> containedFiles = Files.list(actualUploadDir))
{
+ assertEquals("Not all files were cleaned up.", 0,
containedFiles.count());
+ }
}
/**
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 95c357d..5cbde5d 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -122,7 +123,10 @@ public class JobMasterTriggerSavepointIT extends
AbstractTestBase {
assertThat(jobStatus, isOneOf(JobStatus.CANCELED,
JobStatus.CANCELLING));
- final List<Path> savepoints =
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+ final List<Path> savepoints;
+ try (Stream<Path> savepointFiles =
Files.list(savepointDirectory)) {
+ savepoints =
savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+ }
assertThat(savepoints,
hasItem(Paths.get(savepointLocation).getFileName()));
}
@@ -136,7 +140,10 @@ public class JobMasterTriggerSavepointIT extends
AbstractTestBase {
assertThat(jobStatus, isOneOf(JobStatus.CANCELED,
JobStatus.CANCELLING));
- final List<Path> savepoints =
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+ final List<Path> savepoints;
+ try (Stream<Path> savepointFiles =
Files.list(savepointDirectory)) {
+ savepoints =
savepointFiles.map(Path::getFileName).collect(Collectors.toList());
+ }
assertThat(savepoints,
hasItem(Paths.get(savepointLocation).getFileName()));
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 0635f23..4a08b7c 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -54,6 +54,7 @@ import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
import static org.junit.Assert.assertNotNull;
@@ -326,16 +327,18 @@ public class ResumeCheckpointManuallyITCase extends
TestLogger {
}
private static Optional<Path> findExternalizedCheckpoint(File
checkpointDir, JobID jobId) throws IOException {
- return
Files.list(checkpointDir.toPath().resolve(jobId.toString()))
- .filter(path ->
path.getFileName().toString().startsWith("chk-"))
- .filter(path -> {
- try {
- return Files.list(path).anyMatch(child
-> child.getFileName().toString().contains("meta"));
- } catch (IOException ignored) {
- return false;
- }
- })
- .findAny();
+ try (Stream<Path> checkpoints =
Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
+ return checkpoints
+ .filter(path ->
path.getFileName().toString().startsWith("chk-"))
+ .filter(path -> {
+ try (Stream<Path> checkpointFiles =
Files.list(path)) {
+ return
checkpointFiles.anyMatch(child ->
child.getFileName().toString().contains("meta"));
+ } catch (IOException ignored) {
+ return false;
+ }
+ })
+ .findAny();
+ }
}
private static void waitUntilCanceled(JobID jobId, ClusterClient<?>
client) throws ExecutionException, InterruptedException {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 5fc3567..b272a21 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -25,6 +25,7 @@ import org.junit.rules.TemporaryFolder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
+import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -41,12 +42,18 @@ public class UtilsTest {
public void testDeleteApplicationFiles() throws Exception {
final Path applicationFilesDir =
temporaryFolder.newFolder(".flink").toPath();
Files.createFile(applicationFilesDir.resolve("flink.jar"));
-
assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(1L));
- assertThat(Files.list(applicationFilesDir).count(),
equalTo(1L));
+ try (Stream<Path> files =
Files.list(temporaryFolder.getRoot().toPath())) {
+ assertThat(files.count(), equalTo(1L));
+ }
+ try (Stream<Path> files = Files.list(applicationFilesDir)) {
+ assertThat(files.count(), equalTo(1L));
+ }
Utils.deleteApplicationFiles(Collections.singletonMap(
YarnConfigKeys.FLINK_YARN_FILES,
applicationFilesDir.toString()));
-
assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(0L));
+ try (Stream<Path> files =
Files.list(temporaryFolder.getRoot().toPath())) {
+ assertThat(files.count(), equalTo(0L));
+ }
}
}