Repository: flink
Updated Branches:
  refs/heads/master 8c7b3d9af -> 849990c00


[FLINK-7054] [blob] Remove LibraryCacheManager#getFile()

This was only used in tests where it is avoidable but if used anywhere else, it
may have caused cleanup issues.

This closes #4235.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/849990c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/849990c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/849990c0

Branch: refs/heads/master
Commit: 849990c00c1e7979b366088dbdba3f02a6123445
Parents: 8c7b3d9
Author: Nico Kruber <n...@data-artisans.com>
Authored: Wed Jun 21 14:45:31 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Aug 8 16:08:31 2017 +0200

----------------------------------------------------------------------
 .../librarycache/BlobLibraryCacheManager.java   | 36 +++++---------------
 .../FallbackLibraryCacheManager.java            |  9 +----
 .../librarycache/LibraryCacheManager.java       | 10 ------
 .../BlobLibraryCacheManagerTest.java            | 23 +++++++------
 .../BlobLibraryCacheRecoveryITCase.java         |  6 ++--
 5 files changed, 24 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 0c4cb85..0387725 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -18,7 +18,14 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
-import java.io.File;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
@@ -32,15 +39,6 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.blob.BlobService;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -51,8 +49,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>
  * All files registered via {@link #registerJob(JobID, Collection, 
Collection)} are reference-counted
  * and are removed by a timer-based cleanup task if their reference counter is 
zero.
- * <strong>NOTE:</strong> this does not apply to files that enter the blob 
service via
- * {@link #getFile(BlobKey)}!
  */
 public final class BlobLibraryCacheManager extends TimerTask implements 
LibraryCacheManager {
 
@@ -202,22 +198,6 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                }
        }
 
-       /**
-        * Returns a file handle to the file identified by the blob key.
-        * <p>
-        * <strong>NOTE:</strong> if not already registered during
-        * {@link #registerJob(JobID, Collection, Collection)}, files that 
enter the library cache /
-        * backing blob store using this method will not be reference-counted 
and garbage-collected!
-        *
-        * @param blobKey identifying the requested file
-        * @return File handle
-        * @throws IOException if any error occurs when retrieving the file
-        */
-       @Override
-       public File getFile(BlobKey blobKey) throws IOException {
-               return new File(blobService.getURL(blobKey).getFile());
-       }
-
        public int getBlobServerPort() {
                return blobService.getPort();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 1ef6e31..8e14e58 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -18,14 +18,12 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 
@@ -39,11 +37,6 @@ public class FallbackLibraryCacheManager implements 
LibraryCacheManager {
        }
 
        @Override
-       public File getFile(BlobKey blobKey) throws IOException {
-               throw new IOException("There is no file associated to the blob 
key " + blobKey);
-       }
-
-       @Override
        public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, 
Collection<URL> requiredClasspaths) {
                LOG.warn("FallbackLibraryCacheManager cannot download files 
associated with blob keys.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index bf05271..5f9f443 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
@@ -37,15 +36,6 @@ public interface LibraryCacheManager {
        ClassLoader getClassLoader(JobID id);
 
        /**
-        * Returns a file handle to the file identified by the blob key.
-        *
-        * @param blobKey identifying the requested file
-        * @return File handle
-        * @throws IOException if any error occurs when retrieving the file
-        */
-       File getFile(BlobKey blobKey) throws IOException;
-
-       /**
         * Registers a job with its required jar files and classpaths. The jar 
files are identified by their blob keys.
         *
         * @param id job ID

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 9d2bd55..606d8c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
@@ -82,7 +83,7 @@ public class BlobLibraryCacheManagerTest {
                        libraryCacheManager = new 
BlobLibraryCacheManager(server, cleanupInterval);
                        libraryCacheManager.registerJob(jid, keys, 
Collections.<URL>emptyList());
 
-                       assertEquals(2, checkFilesExist(keys, 
libraryCacheManager, true));
+                       assertEquals(2, checkFilesExist(keys, server, true));
                        assertEquals(2, 
libraryCacheManager.getNumberOfCachedLibraries());
                        assertEquals(1, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -104,7 +105,7 @@ public class BlobLibraryCacheManagerTest {
                        assertEquals(0, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
 
                        // the blob cache should no longer contain the files
-                       assertEquals(0, checkFilesExist(keys, 
libraryCacheManager, false));
+                       assertEquals(0, checkFilesExist(keys, server, false));
 
                        try {
                                server.getURL(keys.get(0));
@@ -144,21 +145,21 @@ public class BlobLibraryCacheManagerTest {
         *
         * @param keys
         *              blob keys to check
-        * @param libraryCacheManager
-        *              cache manager to use
+        * @param blobService
+        *              BLOB store to use
         * @param doThrow
         *              whether exceptions should be ignored (<tt>false</tt>), 
or throws (<tt>true</tt>)
         *
-        * @return number of files we were able to retrieve via {@link 
BlobLibraryCacheManager#getFile(BlobKey)}
+        * @return number of files we were able to retrieve via {@link 
BlobService#getURL(BlobKey)}
         */
-       private int checkFilesExist(
-                       List<BlobKey> keys, BlobLibraryCacheManager 
libraryCacheManager, boolean doThrow)
+       private static int checkFilesExist(
+               List<BlobKey> keys, BlobService blobService, boolean doThrow)
                        throws IOException {
                int numFiles = 0;
 
                for (BlobKey key : keys) {
                        try {
-                               libraryCacheManager.getFile(key);
+                               blobService.getURL(key);
                                ++numFiles;
                        } catch (IOException e) {
                                if (doThrow) {
@@ -204,13 +205,13 @@ public class BlobLibraryCacheManagerTest {
                        libraryCacheManager.registerTask(jid, executionId1, 
keys, Collections.<URL>emptyList());
                        libraryCacheManager.registerTask(jid, executionId2, 
keys, Collections.<URL>emptyList());
 
-                       assertEquals(2, checkFilesExist(keys, 
libraryCacheManager, true));
+                       assertEquals(2, checkFilesExist(keys, server, true));
                        assertEquals(2, 
libraryCacheManager.getNumberOfCachedLibraries());
                        assertEquals(2, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
 
                        libraryCacheManager.unregisterTask(jid, executionId1);
 
-                       assertEquals(2, checkFilesExist(keys, 
libraryCacheManager, true));
+                       assertEquals(2, checkFilesExist(keys, server, true));
                        assertEquals(2, 
libraryCacheManager.getNumberOfCachedLibraries());
                        assertEquals(1, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
 
@@ -232,7 +233,7 @@ public class BlobLibraryCacheManagerTest {
                        assertEquals(0, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
 
                        // the blob cache should no longer contain the files
-                       assertEquals(0, checkFilesExist(keys, 
libraryCacheManager, false));
+                       assertEquals(0, checkFilesExist(keys, server, false));
 
                        bc.close();
                } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/849990c0/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 02f121b..e5efd19 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -107,7 +107,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        libServer[0].registerTask(jobId, executionId, keys, 
Collections.<URL>emptyList());
 
                        // Verify key 1
-                       File f = libCache.getFile(keys.get(0));
+                       File f = new File(cache.getURL(keys.get(0)).toURI());
                        assertEquals(expected.length, f.length());
 
                        try (FileInputStream fis = new FileInputStream(f)) {
@@ -126,7 +126,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
 
                        // Verify key 1
-                       f = libCache.getFile(keys.get(0));
+                       f = new File(cache.getURL(keys.get(0)).toURI());
                        assertEquals(expected.length, f.length());
 
                        try (FileInputStream fis = new FileInputStream(f)) {
@@ -138,7 +138,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        }
 
                        // Verify key 2
-                       f = libCache.getFile(keys.get(1));
+                       f = new File(cache.getURL(keys.get(1)).toURI());
                        assertEquals(256, f.length());
 
                        try (FileInputStream fis = new FileInputStream(f)) {

Reply via email to