[FLINK-6820] Activate checkstyle for runtime/filecache This closes #4062.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae4f2b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae4f2b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae4f2b0 Branch: refs/heads/master Commit: 8ae4f2b0ae103540435244ff75ab4b593670dd76 Parents: 92cd736 Author: zentol <[email protected]> Authored: Fri Jun 2 21:06:16 2017 +0200 Committer: zentol <[email protected]> Committed: Wed Jun 7 23:06:08 2017 +0200 ---------------------------------------------------------------------- flink-runtime/pom.xml | 1 - .../flink/runtime/filecache/FileCache.java | 36 +++++++++--------- .../FileCacheDeleteValidationTest.java | 39 ++++++++++---------- 3 files changed, 36 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 3cee8d8..602f788 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -430,7 +430,6 @@ under the License. **/runtime/deployment/**, **/runtime/execution/**, **/runtime/executiongraph/**, - **/runtime/filecache/**, **/runtime/fs/**, **/runtime/heartbeat/**, **/runtime/highavailability/**, http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index 4f2166f..84b8feb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.filecache; -import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.java.tuple.Tuple4; @@ -31,6 +30,7 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +54,8 @@ import java.util.concurrent.TimeUnit; public class FileCache { static final Logger LOG = LoggerFactory.getLogger(FileCache.class); - - /** cache-wide lock to ensure consistency. copies are not done under this lock */ + + /** cache-wide lock to ensure consistency. copies are not done under this lock. */ private final Object lock = new Object(); private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries; @@ -99,12 +99,12 @@ public class FileCache { this.shutdownHook = createShutdownHook(this, LOG); this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>(); - this.executorService = Executors.newScheduledThreadPool(10, + this.executorService = Executors.newScheduledThreadPool(10, new ExecutorThreadFactory("flink-file-cache")); } /** - * Shuts down the file cache by cancelling all + * Shuts down the file cache by cancelling all. */ public void shutdown() { synchronized (lock) { @@ -119,9 +119,9 @@ public class FileCache { // may happen } } - + entries.clear(); - + // clean up the all storage directories for (File dir : storageDirectories) { try { @@ -172,7 +172,7 @@ public class FileCache { // file is already in the cache. return a future that // immediately returns the file fileEntry.f0 = fileEntry.f0 + 1; - + // return the future. may be that the copy is still in progress return fileEntry.f3; } @@ -197,10 +197,10 @@ public class FileCache { CopyProcess cp = new CopyProcess(entry, target); FutureTask<Path> copyTask = new FutureTask<Path>(cp); executorService.submit(copyTask); - + // store our entry jobEntries.put(name, new Tuple4<Integer, File, Path, Future<Path>>(1, tempDirToUse, target, copyTask)); - + return copyTask; } } @@ -216,8 +216,7 @@ public class FileCache { DeleteProcess dp = new DeleteProcess(lock, entries, name, jobID); executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS); } - - + boolean holdsStillReference(String name, JobID jobId) { Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobId); if (jobEntries != null) { @@ -298,7 +297,7 @@ public class FileCache { // ------------------------------------------------------------------------ /** - * Asynchronous file copy process + * Asynchronous file copy process. */ private static class CopyProcess implements Callable<Path> { @@ -333,8 +332,7 @@ public class FileCache { private final JobID jobID; public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries, - String name, JobID jobID) - { + String name, JobID jobID) { this.lock = lock; this.entries = entries; this.name = name; @@ -346,10 +344,10 @@ public class FileCache { try { synchronized (lock) { Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID); - + if (jobEntries != null) { Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name); - + if (entry != null) { int count = entry.f0; if (count > 1) { @@ -362,7 +360,7 @@ public class FileCache { if (jobEntries.isEmpty()) { entries.remove(jobID); } - + // abort the copy entry.f3.cancel(true); @@ -376,7 +374,7 @@ public class FileCache { LOG.error("Could not delete locally cached file " + file.getAbsolutePath()); } } - + // remove the job wide temp directory, if it is now empty File parent = entry.f1; if (parent.isDirectory()) { http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java index 4dca3db..89ab975 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java @@ -18,26 +18,25 @@ package org.apache.flink.runtime.filecache; -import java.io.File; -import java.io.IOException; -import java.util.concurrent.Future; - -import org.apache.flink.core.fs.Path; -import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; +import org.apache.flink.core.fs.Path; +import com.google.common.base.Charsets; +import com.google.common.io.Files; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; import org.junit.rules.TemporaryFolder; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Future; + import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Test delete process of {@link FileCache}. The local cache file should not be deleted why another task comes in 5 seconds. @@ -64,7 +63,7 @@ public class FileCacheDeleteValidationTest { private FileCache fileCache; private File f; - + @Before public void setup() throws IOException { String[] tmpDirectories = new String[]{temporaryFolder.newFolder().getAbsolutePath()}; @@ -75,7 +74,7 @@ public class FileCacheDeleteValidationTest { e.printStackTrace(); fail("Cannot create FileCache: " + e.getMessage()); } - + f = temporaryFolder.newFile("cacheFile"); try { Files.write(testFileContent, f, Charsets.UTF_8); @@ -102,19 +101,19 @@ public class FileCacheDeleteValidationTest { try { final JobID jobID = new JobID(); final String fileName = "test_file"; - + final String filePath = f.toURI().toString(); - + // copy / create the file Future<Path> copyResult = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID); copyResult.get(); - + // get another reference to the file Future<Path> copyResult2 = fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID); - + // this should be available immediately assertTrue(copyResult2.isDone()); - + // delete the file fileCache.deleteTmpFile(fileName, jobID); // file should not yet be deleted @@ -124,10 +123,10 @@ public class FileCacheDeleteValidationTest { fileCache.deleteTmpFile(fileName, jobID); // file should still not be deleted, but remain for a bit assertTrue(fileCache.holdsStillReference(fileName, jobID)); - + fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), jobID); fileCache.deleteTmpFile(fileName, jobID); - + // after a while, the file should disappear long deadline = System.currentTimeMillis() + 20000; do {
