[FLINK-6820] Activate checkstyle for runtime/filecache

This closes #4062.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ae4f2b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ae4f2b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ae4f2b0

Branch: refs/heads/master
Commit: 8ae4f2b0ae103540435244ff75ab4b593670dd76
Parents: 92cd736
Author: zentol <[email protected]>
Authored: Fri Jun 2 21:06:16 2017 +0200
Committer: zentol <[email protected]>
Committed: Wed Jun 7 23:06:08 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../flink/runtime/filecache/FileCache.java      | 36 +++++++++---------
 .../FileCacheDeleteValidationTest.java          | 39 ++++++++++----------
 3 files changed, 36 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 3cee8d8..602f788 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -430,7 +430,6 @@ under the License.
                                                **/runtime/deployment/**,
                                                **/runtime/execution/**,
                                                **/runtime/executiongraph/**,
-                                               **/runtime/filecache/**,
                                                **/runtime/fs/**,
                                                **/runtime/heartbeat/**,
                                                **/runtime/highavailability/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 4f2166f..84b8feb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.filecache;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +54,8 @@ import java.util.concurrent.TimeUnit;
 public class FileCache {
 
        static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
-       
-       /** cache-wide lock to ensure consistency. copies are not done under 
this lock */
+
+       /** cache-wide lock to ensure consistency. copies are not done under 
this lock. */
        private final Object lock = new Object();
 
        private final Map<JobID, Map<String, Tuple4<Integer, File, Path, 
Future<Path>>>> entries;
@@ -99,12 +99,12 @@ public class FileCache {
                this.shutdownHook = createShutdownHook(this, LOG);
 
                this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, 
File, Path, Future<Path>>>>();
-               this.executorService = Executors.newScheduledThreadPool(10, 
+               this.executorService = Executors.newScheduledThreadPool(10,
                                new ExecutorThreadFactory("flink-file-cache"));
        }
 
        /**
-        * Shuts down the file cache by cancelling all
+        * Shuts down the file cache by cancelling all.
         */
        public void shutdown() {
                synchronized (lock) {
@@ -119,9 +119,9 @@ public class FileCache {
                                        // may happen
                                }
                        }
-                       
+
                        entries.clear();
-                       
+
                        // clean up the all storage directories
                        for (File dir : storageDirectories) {
                                try {
@@ -172,7 +172,7 @@ public class FileCache {
                                // file is already in the cache. return a 
future that
                                // immediately returns the file
                                fileEntry.f0 = fileEntry.f0 + 1;
-                               
+
                                // return the future. may be that the copy is 
still in progress
                                return fileEntry.f3;
                        }
@@ -197,10 +197,10 @@ public class FileCache {
                                CopyProcess cp = new CopyProcess(entry, target);
                                FutureTask<Path> copyTask = new 
FutureTask<Path>(cp);
                                executorService.submit(copyTask);
-                               
+
                                // store our entry
                                jobEntries.put(name, new Tuple4<Integer, File, 
Path, Future<Path>>(1, tempDirToUse, target, copyTask));
-                               
+
                                return copyTask;
                        }
                }
@@ -216,8 +216,7 @@ public class FileCache {
                DeleteProcess dp = new DeleteProcess(lock, entries, name, 
jobID);
                executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
        }
-       
-       
+
        boolean holdsStillReference(String name, JobID jobId) {
                Map<String, Tuple4<Integer, File, Path, Future<Path>>> 
jobEntries = entries.get(jobId);
                if (jobEntries != null) {
@@ -298,7 +297,7 @@ public class FileCache {
        // 
------------------------------------------------------------------------
 
        /**
-        * Asynchronous file copy process
+        * Asynchronous file copy process.
         */
        private static class CopyProcess implements Callable<Path> {
 
@@ -333,8 +332,7 @@ public class FileCache {
                private final JobID jobID;
 
                public DeleteProcess(Object lock, Map<JobID, Map<String, 
Tuple4<Integer, File, Path, Future<Path>>>> entries,
-                                                               String name, 
JobID jobID)
-               {
+                                                               String name, 
JobID jobID) {
                        this.lock = lock;
                        this.entries = entries;
                        this.name = name;
@@ -346,10 +344,10 @@ public class FileCache {
                        try {
                                synchronized (lock) {
                                        Map<String, Tuple4<Integer, File, Path, 
Future<Path>>> jobEntries = entries.get(jobID);
-                                       
+
                                        if (jobEntries != null) {
                                                Tuple4<Integer, File, Path, 
Future<Path>> entry = jobEntries.get(name);
-                                               
+
                                                if (entry != null) {
                                                        int count = entry.f0;
                                                        if (count > 1) {
@@ -362,7 +360,7 @@ public class FileCache {
                                                                if 
(jobEntries.isEmpty()) {
                                                                        
entries.remove(jobID);
                                                                }
-                                                               
+
                                                                // abort the 
copy
                                                                
entry.f3.cancel(true);
 
@@ -376,7 +374,7 @@ public class FileCache {
                                                                                
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
                                                                        }
                                                                }
-                                                               
+
                                                                // remove the 
job wide temp directory, if it is now empty
                                                                File parent = 
entry.f1;
                                                                if 
(parent.isDirectory()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ae4f2b0/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index 4dca3db..89ab975 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -18,26 +18,25 @@
 
 package org.apache.flink.runtime.filecache;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.Future;
-
-import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
+import org.apache.flink.core.fs.Path;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
 import org.junit.rules.TemporaryFolder;
 
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.Future;
+
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Test delete process of {@link FileCache}. The local cache file should not 
be deleted why another task comes in 5 seconds.
@@ -64,7 +63,7 @@ public class FileCacheDeleteValidationTest {
 
        private FileCache fileCache;
        private File f;
-       
+
        @Before
        public void setup() throws IOException {
                String[] tmpDirectories = new 
String[]{temporaryFolder.newFolder().getAbsolutePath()};
@@ -75,7 +74,7 @@ public class FileCacheDeleteValidationTest {
                        e.printStackTrace();
                        fail("Cannot create FileCache: " + e.getMessage());
                }
-               
+
                f = temporaryFolder.newFile("cacheFile");
                try {
                        Files.write(testFileContent, f, Charsets.UTF_8);
@@ -102,19 +101,19 @@ public class FileCacheDeleteValidationTest {
                try {
                        final JobID jobID = new JobID();
                        final String fileName = "test_file";
-                       
+
                        final String filePath = f.toURI().toString();
-                       
+
                        // copy / create the file
                        Future<Path> copyResult = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
                        copyResult.get();
-                       
+
                        // get another reference to the file
                        Future<Path> copyResult2 = 
fileCache.createTmpFile(fileName, new DistributedCacheEntry(filePath, false), 
jobID);
-                       
+
                        // this should be available immediately
                        assertTrue(copyResult2.isDone());
-                       
+
                        // delete the file
                        fileCache.deleteTmpFile(fileName, jobID);
                        // file should not yet be deleted
@@ -124,10 +123,10 @@ public class FileCacheDeleteValidationTest {
                        fileCache.deleteTmpFile(fileName, jobID);
                        // file should still not be deleted, but remain for a 
bit
                        assertTrue(fileCache.holdsStillReference(fileName, 
jobID));
-                       
+
                        fileCache.createTmpFile(fileName, new 
DistributedCacheEntry(filePath, false), jobID);
                        fileCache.deleteTmpFile(fileName, jobID);
-                       
+
                        // after a while, the file should disappear
                        long deadline = System.currentTimeMillis() + 20000;
                        do {

Reply via email to