[FLINK-7057][blob] move ref-counting from the LibraryCacheManager to the 
BlobCache

Also change from BlobKey-based ref-counting to job-based ref-counting which is
simpler and the mode we want to use from now on. Deferred cleanup (as before)
is currently not implemented yet (TODO).
At the BlobServer, no ref-counting will be used but the cleanup will happen
when the job enters a final state (TODO).

[FLINK-7057][blob] change to a cleaner API for BlobService#registerJob()

[FLINK-7057][blob] implement deferred cleanup at the BlobCache

Whenever a job is not referenced at the BlobCache anymore, we set a TTL and let
the cleanup task remove it when this is hit and the task is run. For now, this
means that a BLOB will be retained at most
(2 * ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) seconds after not
being referenced anymore. We do this so that a recovery still has the chance to
use existing files rather than to download them again.

[FLINK-7057][blob] integrate cleanup of job-related JARs from the BlobServer

TODO: an integration test that verifies that this is actually done when desired
and not performed when not, e.g. if the job did not reach a final execution
state

[FLINK-7057][tests] extract FailingBlockingInvokable from 
CoordinatorShutdownTest

[FLINK-7057][blob] add an integration test for the BlobServer cleanup

This ensures that BLOB files are actually deleted when a job enters a final
state.

[FLINK-7057][tests] refrain from catching an exception just to fail the test

removes code like this in the BLOB store unit tests:

catch (Exception e) {
    e.printStackTrace();
    fail(e.getMessage());
}

[FLINK-7057][blob] fix BlobServer#cleanupJob() being too eager

Instead of deleting the job's directory, it was deleting the parent storage
directory.

[FLINK-7057][blob] fix BlobServer cleanup integration

* the test did not check the correct directories for cleanup
* the test did not honour the test timeout

[FLINK-7057][blob] test and fix BlobServer cleanup for a failed job submission

[FLINK-7057][blob] rework the LibraryCacheManager API

Since ref-counting has moved to the BlobCache, the BlobLibraryCacheManager is
just a thin wrapper to get a user class loader by retrieving BLOBs from the
BlobCache/BlobServer. Therefore, move the job-registration/-release out of it,
too, and restrict its use to the task manager where the BlobCache is used (on
the BlobServer, jobs do not need registration since they are only used once and
will be deleted when they enter a final state).

This makes the BlobServer and BlobCache instances available at the JobManager
and TaskManager instances, respectively, also enabling future use cases outside
of the LibraryCacheManager.

[FLINK-7057][blob] address PR comments

[FLINK-7057][blob] fix JobManagerLeaderElectionTest

[FLINK-7057][blob] re-introduce some ref-counting for BlobLibraryCacheManager

Apparently, we do need to return the same ClassLoader for different (parallel)
tasks of a job running on the same task manager. Therefore, keep the initial
task registration implementation that was removed with
8331fbb208d975e0c1ec990344c14315ea08dd4a and only adapt it here. This also
restores some tests and adds new combinations not tested before.

[FLINK-7057][blob] address PR comments

[FLINK-7057][tests] fix (manual/ignored) 
BlobCacheCleanupTest#testJobDeferredCleanup()

[FLINK-7057][hotfix] fix a checkstyle error

[FLINK-7057][blob] remove the extra lock object from BlobCache

We can lock on jobRefCounters instead, which is what we are guarding anyway.

[FLINK-7057][blob] minor improvements to the TTL in BlobCache

Do not use Long.MAX_VALUE as a code for "keep forever". Also add more comments.

[FLINK-7057][blob] replace "library-cache-manager.cleanup.interval" with 
"blob.service.cleanup.interval"

Since we moved the cleanup to the BLOB service classes, this only makes sense.

[FLINK-7057][hotfix] remove an unused import

[FLINK-7057][docs] adapt javadocs of JobManager descendents

[FLINK-7057][blob] increase JobManagerCleanupITCase timeout

The previous value of 15s seems to be too low for some runs on Travis.

[FLINK-7057][blob] providing more debug output in JobManagerCleanupITCase

In case the BlobServer's directory is not cleaned within the remaining time,
also print which files remain. This may help debugging the situation.

This closes #4238.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b236240
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b236240
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b236240

Branch: refs/heads/master
Commit: 7b23624066c46d58c7b7181e5576a9834af9ac7a
Parents: 9c80d40
Author: Nico Kruber <n...@data-artisans.com>
Authored: Tue Jun 27 18:29:44 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Aug 18 09:29:32 2017 +0200

----------------------------------------------------------------------
 docs/ops/config.md                              |   7 +
 .../flink/configuration/BlobServerOptions.java  |  16 +-
 .../flink/configuration/ConfigConstants.java    |   9 +-
 .../clusterframework/MesosJobManager.scala      |   8 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 140 ++++-
 .../apache/flink/runtime/blob/BlobClient.java   |  21 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  40 +-
 .../runtime/blob/BlobServerConnection.java      |  18 +-
 .../apache/flink/runtime/blob/BlobService.java  |   5 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |  25 +-
 .../apache/flink/runtime/client/JobClient.java  |   3 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |   3 +-
 .../dispatcher/StandaloneDispatcher.java        |   5 +-
 .../entrypoint/JobClusterEntrypoint.java        |   3 +-
 .../librarycache/BlobLibraryCacheManager.java   | 327 ++++++-----
 .../FallbackLibraryCacheManager.java            |   8 +-
 .../librarycache/LibraryCacheManager.java       |  25 +-
 .../apache/flink/runtime/jobgraph/JobGraph.java |   2 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   7 +-
 .../runtime/jobmaster/JobManagerServices.java   |  20 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  14 +-
 .../taskexecutor/JobManagerConnection.java      |  32 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  11 +-
 .../taskexecutor/TaskManagerConfiguration.java  |  12 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  13 +-
 .../ContaineredJobManager.scala                 |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  48 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |   5 +
 .../flink/runtime/taskmanager/TaskManager.scala |  34 +-
 .../runtime/blob/BlobCacheCleanupTest.java      | 328 +++++++++++
 .../runtime/blob/BlobCacheRetriesTest.java      |   4 +-
 .../flink/runtime/blob/BlobClientTest.java      |  29 +-
 .../apache/flink/runtime/blob/BlobKeyTest.java  |   6 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  85 ++-
 .../flink/runtime/blob/BlobUtilsTest.java       |   3 +-
 .../checkpoint/CoordinatorShutdownTest.java     |  23 +-
 .../runtime/dispatcher/DispatcherTest.java      |   3 +-
 .../BlobLibraryCacheManagerTest.java            | 540 +++++++++++++------
 .../BlobLibraryCacheRecoveryITCase.java         |  36 +-
 .../jobmanager/JobManagerCleanupITCase.java     | 300 +++++++++++
 .../jobmanager/JobManagerHARecoveryTest.java    |  20 +-
 .../flink/runtime/jobmanager/JobSubmitTest.java |   5 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |   9 +-
 .../JobManagerLeaderElectionTest.java           |  11 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   5 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +
 .../flink/runtime/taskmanager/TaskTest.java     |  35 +-
 .../testtasks/FailingBlockingInvokable.java     |  48 ++
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   2 +
 .../jobmanager/JobManagerRegistrationTest.scala |   5 +-
 .../runtime/testingUtils/TestingCluster.scala   |   3 +
 .../testingUtils/TestingJobManager.scala        |   9 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   2 +
 .../tasks/InterruptSensitiveRestoreTest.java    |   2 +
 .../tasks/StreamTaskTerminationTest.java        |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java |   3 +
 .../flink/yarn/TestingYarnJobManager.scala      |   3 +
 .../org/apache/flink/yarn/YarnJobManager.scala  |   6 +-
 59 files changed, 1759 insertions(+), 640 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index 4138b4d..e0b9d4d 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -196,6 +196,13 @@ will be used under the directory specified by 
jobmanager.web.tmpdir.
 
 - `blob.storage.directory`: Directory for storing blobs (such as user JARs) on 
the TaskManagers.
 
+- `blob.service.cleanup.interval`: Cleanup interval (in seconds) of the blob 
caches (DEFAULT: 1 hour).
+Whenever a job is not referenced at the cache anymore, we set a TTL and let 
the periodic cleanup task
+(executed every `blob.service.cleanup.interval` seconds) remove its blob files 
after this TTL has passed.
+This means that a blob will be retained at most <tt>2 * 
`blob.service.cleanup.interval`</tt> seconds after
+not being referenced anymore. Therefore, a recovery still has the chance to 
use existing files rather
+than to download them again.
+
 - `blob.server.port`: Port definition for the blob server (serving user JARs) 
on the TaskManagers. By default the port is set to 0, which means that the 
operating system is picking an ephemeral port. Flink also accepts a list of 
ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is 
recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running on the same machine.
 
 - `blob.service.ssl.enabled`: Flag to enable ssl for the blob client/server 
communication. This is applicable only when the global ssl flag 
security.ssl.enabled is set to true (DEFAULT: true).

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
index e27c29f..019580a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/BlobServerOptions.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import static org.apache.flink.configuration.ConfigOptions.key;
 
 /**
- * Configuration options for the BlobServer.
+ * Configuration options for the BlobServer and BlobCache.
  */
 @PublicEvolving
 public class BlobServerOptions {
@@ -73,4 +73,18 @@ public class BlobServerOptions {
        public static final ConfigOption<Boolean> SSL_ENABLED =
                key("blob.service.ssl.enabled")
                        .defaultValue(true);
+
+       /**
+        * Cleanup interval of the blob caches at the task managers (in 
seconds).
+        *
+        * <p>Whenever a job is not referenced at the cache anymore, we set a 
TTL and let the periodic
+        * cleanup task (executed every CLEANUP_INTERVAL seconds) remove its 
blob files after this TTL
+        * has passed. This means that a blob will be retained at most <tt>2 * 
CLEANUP_INTERVAL</tt>
+        * seconds after not being referenced anymore. Therefore, a recovery 
still has the chance to use
+        * existing files rather than to download them again.
+        */
+       public static final ConfigOption<Long> CLEANUP_INTERVAL =
+               key("blob.service.cleanup.interval")
+                       .defaultValue(3_600L) // once per hour
+                       
.withDeprecatedKeys("library-cache-manager.cleanup.interval");
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 4c6c62a..4153e45 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -178,7 +178,10 @@ public final class ConfigConstants {
 
        /**
         * The config parameter defining the cleanup interval of the library 
cache manager.
+        *
+        * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
         */
+       @Deprecated
        public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 
"library-cache-manager.cleanup.interval";
 
        /**
@@ -1253,8 +1256,12 @@ public final class ConfigConstants {
 
        /**
         * The default library cache manager cleanup interval in seconds
+        *
+        * @deprecated use {@link BlobServerOptions#CLEANUP_INTERVAL} instead
         */
-       public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL 
= 3600;
+       @Deprecated
+       public static final long DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL 
=
+               BlobServerOptions.CLEANUP_INTERVAL.defaultValue();
        
        /**
         * The default network port to connect to for communication with the 
job manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 3e7c55f..f854a1e 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{Executor, 
ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -34,7 +35,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 
-/** JobManager actor for execution on Mesos. .
+/** JobManager actor for execution on Mesos.
   *
   * @param flinkConfiguration Configuration object for the actor
   * @param futureExecutor Execution context which is used to execute 
concurrent tasks in the
@@ -43,7 +44,8 @@ import scala.concurrent.duration._
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
-  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param blobServer BLOB store for file uploads
+  * @param libraryCacheManager manages uploaded jar files and class paths
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
   * @param timeout Timeout for futures
@@ -55,6 +57,7 @@ class MesosJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -70,6 +73,7 @@ class MesosJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 29f7706..c50a888 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -25,7 +26,6 @@ import org.apache.flink.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -33,6 +33,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -47,7 +52,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * download it from a distributed file system (if available) or the BLOB
  * server.</p>
  */
-public final class BlobCache implements BlobService {
+public class BlobCache extends TimerTask implements BlobService {
 
        /** The log object used for debugging. */
        private static final Logger LOG = 
LoggerFactory.getLogger(BlobCache.class);
@@ -71,6 +76,32 @@ public final class BlobCache implements BlobService {
        /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
        private final Configuration blobClientConfig;
 
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Job reference counters with a time-to-live (TTL).
+        */
+       private static class RefCount {
+               /**
+                * Number of references to a job.
+                */
+               public int references = 0;
+               
+               /**
+                * Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+                * non-positive values).
+                */
+               public long keepUntil = -1;
+       }
+
+       /** Map to store the number of references to a specific job */
+       private final Map<JobID, RefCount> jobRefCounters = new HashMap<>();
+
+       /** Time interval (ms) to run the cleanup task; also used as the 
default TTL. */
+       private final long cleanupInterval;
+
+       private final Timer cleanupTimer;
+
        /**
         * Instantiates a new BLOB cache.
         *
@@ -108,11 +139,63 @@ public final class BlobCache implements BlobService {
                        this.numFetchRetries = 0;
                }
 
+               // Initializing the clean up task
+               this.cleanupTimer = new Timer(true);
+
+               cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+               this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
+
                // Add shutdown hook to delete storage directory
                shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
 
        /**
+        * Registers use of job-related BLOBs.
+        * <p>
+        * Using any other method to access BLOBs, e.g. {@link #getFile}, is 
only valid within calls
+        * to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}.
+        *
+        * @param jobId
+        *              ID of the job this blob belongs to
+        *
+        * @see #releaseJob(JobID)
+        */
+       public void registerJob(JobID jobId) {
+               synchronized (jobRefCounters) {
+                       RefCount ref = jobRefCounters.get(jobId);
+                       if (ref == null) {
+                               ref = new RefCount();
+                               jobRefCounters.put(jobId, ref);
+                       }
+                       ++ref.references;
+               }
+       }
+
+       /**
+        * Unregisters use of job-related BLOBs and allow them to be released.
+        *
+        * @param jobId
+        *              ID of the job this blob belongs to
+        *
+        * @see #registerJob(JobID)
+        */
+       public void releaseJob(JobID jobId) {
+               synchronized (jobRefCounters) {
+                       RefCount ref = jobRefCounters.get(jobId);
+
+                       if (ref == null) {
+                               LOG.warn("improper use of releaseJob() without 
a matching number of registerJob() calls");
+                               return;
+                       }
+
+                       --ref.references;
+                       if (ref.references == 0) {
+                               ref.keepUntil = System.currentTimeMillis() + 
cleanupInterval;
+                       }
+               }
+       }
+
+       /**
         * Returns local copy of the (job-unrelated) file for the BLOB with the 
given key.
         * <p>
         * The method will first attempt to serve the BLOB from its local 
cache. If the BLOB is not in
@@ -148,7 +231,7 @@ public final class BlobCache implements BlobService {
         *              Thrown if an I/O error occurs while downloading the 
BLOBs from the BLOB server.
         */
        @Override
-       public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+       public File getFile(JobID jobId, BlobKey key) throws IOException {
                checkNotNull(jobId);
                return getFileInternal(jobId, key);
        }
@@ -258,7 +341,7 @@ public final class BlobCache implements BlobService {
         * @throws IOException
         */
        @Override
-       public void delete(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+       public void delete(JobID jobId, BlobKey key) throws IOException {
                checkNotNull(jobId);
                deleteInternal(jobId, key);
        }
@@ -307,7 +390,7 @@ public final class BlobCache implements BlobService {
         *              thrown if an I/O error occurs while transferring the 
request to the BLOB server or if the
         *              BLOB server cannot delete the file
         */
-       public void deleteGlobal(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+       public void deleteGlobal(JobID jobId, BlobKey key) throws IOException {
                checkNotNull(jobId);
                deleteGlobalInternal(jobId, key);
        }
@@ -341,8 +424,40 @@ public final class BlobCache implements BlobService {
                return serverAddress.getPort();
        }
 
+       /**
+        * Cleans up BLOBs which are not referenced anymore.
+        */
+       @Override
+       public void run() {
+               synchronized (jobRefCounters) {
+                       Iterator<Map.Entry<JobID, RefCount>> entryIter = 
jobRefCounters.entrySet().iterator();
+                       final long currentTimeMillis = 
System.currentTimeMillis();
+
+                       while (entryIter.hasNext()) {
+                               Map.Entry<JobID, RefCount> entry = 
entryIter.next();
+                               RefCount ref = entry.getValue();
+
+                               if (ref.references <= 0 && ref.keepUntil > 0 && 
currentTimeMillis >= ref.keepUntil) {
+                                       JobID jobId = entry.getKey();
+
+                                       final File localFile =
+                                               new 
File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+                                       try {
+                                               
FileUtils.deleteDirectory(localFile);
+                                               // let's only remove this 
directory from cleanup if the cleanup was successful
+                                               entryIter.remove();
+                                       } catch (Throwable t) {
+                                               LOG.warn("Failed to locally 
delete job directory " + localFile.getAbsolutePath(), t);
+                                       }
+                               }
+                       }
+               }
+       }
+
        @Override
        public void close() throws IOException {
+               cleanupTimer.cancel();
+
                if (shutdownRequested.compareAndSet(false, true)) {
                        LOG.info("Shutting down BlobCache");
 
@@ -369,8 +484,19 @@ public final class BlobCache implements BlobService {
                return new BlobClient(serverAddress, blobClientConfig);
        }
 
-       public File getStorageDir() {
-               return this.storageDir;
+       /**
+        * Returns a file handle to the file associated with the given blob key 
on the blob
+        * server.
+        *
+        * <p><strong>This is only called from the {@link 
BlobServerConnection}</strong>
+        *
+        * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
+        * @param key identifying the file
+        * @return file handle to the file
+        */
+       @VisibleForTesting
+       public File getStorageLocation(JobID jobId, BlobKey key) {
+               return BlobUtils.getStorageLocation(storageDir, jobId, key);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 9a2f59e..8f1487a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -30,7 +30,6 @@ import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLParameters;
@@ -166,7 +165,7 @@ public final class BlobClient implements Closeable {
         * @throws IOException
         *              if an I/O error occurs during the download
         */
-       public InputStream get(@Nonnull JobID jobId, BlobKey blobKey) throws 
IOException {
+       public InputStream get(JobID jobId, BlobKey blobKey) throws IOException 
{
                checkNotNull(jobId);
                return getInternal(jobId, blobKey);
        }
@@ -339,7 +338,7 @@ public final class BlobClient implements Closeable {
         *              thrown if an I/O error occurs while reading the data 
from the input stream or uploading the
         *              data to the BLOB server
         */
-       public BlobKey put(@Nonnull JobID jobId, InputStream inputStream) 
throws IOException {
+       public BlobKey put(JobID jobId, InputStream inputStream) throws 
IOException {
                checkNotNull(jobId);
                return putInputStream(jobId, inputStream);
        }
@@ -369,7 +368,7 @@ public final class BlobClient implements Closeable {
                checkNotNull(value);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len, 
socket.getLocalSocketAddress());
+                       LOG.debug("PUT BLOB buffer (" + len + " bytes) to " + 
socket.getLocalSocketAddress() + ".");
                }
 
                try {
@@ -556,7 +555,7 @@ public final class BlobClient implements Closeable {
         *              thrown if an I/O error occurs while transferring the 
request to the BLOB server or if the
         *              BLOB server cannot delete the file
         */
-       public void delete(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+       public void delete(JobID jobId, BlobKey key) throws IOException {
                checkNotNull(jobId);
                deleteInternal(jobId, key);
        }
@@ -603,23 +602,21 @@ public final class BlobClient implements Closeable {
 
        /**
         * Uploads the JAR files to a {@link BlobServer} at the given address.
-        * <p>
-        * TODO: add jobId to signature after adapting the 
BlobLibraryCacheManager
         *
         * @param serverAddress
         *              Server address of the {@link BlobServer}
         * @param clientConfig
         *              Any additional configuration for the blob client
+        * @param jobId
+        *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
         * @param jars
         *              List of JAR files to upload
         *
         * @throws IOException
         *              if the upload fails
         */
-       public static List<BlobKey> uploadJarFiles(
-                       InetSocketAddress serverAddress,
-                       Configuration clientConfig,
-                       List<Path> jars) throws IOException {
+       public static List<BlobKey> uploadJarFiles(InetSocketAddress 
serverAddress,
+                       Configuration clientConfig, JobID jobId, List<Path> 
jars) throws IOException {checkNotNull(jobId);
                if (jars.isEmpty()) {
                        return Collections.emptyList();
                } else {
@@ -631,7 +628,7 @@ public final class BlobClient implements Closeable {
                                        FSDataInputStream is = null;
                                        try {
                                                is = fs.open(jar);
-                                               final BlobKey key = 
blobClient.putInputStream(null, is);
+                                               final BlobKey key = 
blobClient.putInputStream(jobId, is);
                                                blobKeys.add(key);
                                        } finally {
                                                if (is != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 43a060a..bfcf881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
@@ -29,7 +30,6 @@ import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import java.io.File;
@@ -196,7 +196,8 @@ public class BlobServer extends Thread implements 
BlobService {
         * @param key identifying the file
         * @return file handle to the file
         */
-       File getStorageLocation(JobID jobId, BlobKey key) {
+       @VisibleForTesting
+       public File getStorageLocation(JobID jobId, BlobKey key) {
                return BlobUtils.getStorageLocation(storageDir, jobId, key);
        }
 
@@ -374,7 +375,7 @@ public class BlobServer extends Thread implements 
BlobService {
         *              Thrown if the file retrieval failed.
         */
        @Override
-       public File getFile(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+       public File getFile(JobID jobId, BlobKey key) throws IOException {
                checkNotNull(jobId);
                return getFileInternal(jobId, key);
        }
@@ -450,7 +451,7 @@ public class BlobServer extends Thread implements 
BlobService {
         * @throws IOException
         */
        @Override
-       public void delete(@Nonnull JobID jobId, BlobKey key) throws 
IOException {
+       public void delete(JobID jobId, BlobKey key) throws IOException {
                checkNotNull(jobId);
                deleteInternal(jobId, key);
        }
@@ -483,6 +484,37 @@ public class BlobServer extends Thread implements 
BlobService {
        }
 
        /**
+        * Removes all BLOBs from local and HA store belonging to the given job 
ID.
+        *
+        * @param jobId
+        *              ID of the job this blob belongs to
+        */
+       public void cleanupJob(JobID jobId) {
+               checkNotNull(jobId);
+
+               final File jobDir =
+                       new 
File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId));
+
+               readWriteLock.writeLock().lock();
+
+               try {
+                       // delete locally
+                       try {
+                               FileUtils.deleteDirectory(jobDir);
+                       } catch (IOException e) {
+                               LOG.warn("Failed to locally delete BLOB storage 
directory at " +
+                                       jobDir.getAbsolutePath(), e);
+                       }
+
+                       // delete in HA store
+                       blobStore.deleteAll(jobId);
+               } finally {
+                       readWriteLock.writeLock().unlock();
+               }
+       }
+
+
+       /**
         * Returns the port on which the server is listening.
         *
         * @return port on which the server is listening

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index f1054c0..7f617f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -139,14 +139,7 @@ class BlobServerConnection extends Thread {
                        LOG.error("Error while executing BLOB connection.", t);
                }
                finally {
-                       try {
-                               if (clientSocket != null) {
-                                       clientSocket.close();
-                               }
-                       } catch (Throwable t) {
-                               LOG.debug("Exception while closing BLOB server 
connection socket.", t);
-                       }
-
+                       closeSilently(clientSocket, LOG);
                        blobServer.unregisterConnection(this);
                }
        }
@@ -433,9 +426,8 @@ class BlobServerConnection extends Thread {
                        final InputStream inputStream, final File incomingFile, 
final byte[] buf)
                        throws IOException {
                MessageDigest md = BlobUtils.createMessageDigest();
-               FileOutputStream fos = new FileOutputStream(incomingFile);
 
-               try {
+               try (FileOutputStream fos = new FileOutputStream(incomingFile)) 
{
                        while (true) {
                                final int bytesExpected = 
readLength(inputStream);
                                if (bytesExpected == -1) {
@@ -453,12 +445,6 @@ class BlobServerConnection extends Thread {
                                md.update(buf, 0, bytesExpected);
                        }
                        return new BlobKey(md.digest());
-               } finally {
-                       try {
-                               fos.close();
-                       } catch (Throwable t) {
-                               LOG.warn("Cannot close stream to BLOB staging 
file", t);
-                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index a78c88c..0db5a58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 
-import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
@@ -50,7 +49,7 @@ public interface BlobService extends Closeable {
         * @throws java.io.FileNotFoundException when the path does not exist;
         * @throws IOException if any other error occurs when retrieving the 
file
         */
-       File getFile(@Nonnull JobID jobId, BlobKey key) throws IOException;
+       File getFile(JobID jobId, BlobKey key) throws IOException;
 
        /**
         * Deletes the (job-unrelated) file associated with the provided blob 
key.
@@ -67,7 +66,7 @@ public interface BlobService extends Closeable {
         * @param key associated with the file to be deleted
         * @throws IOException
         */
-       void delete(@Nonnull JobID jobId, BlobKey key) throws IOException;
+       void delete(JobID jobId, BlobKey key) throws IOException;
 
        /**
         * Returns the port of the blob service.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 9b5724b..dabd1bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -29,8 +29,8 @@ import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -175,7 +175,7 @@ public class BlobUtils {
        static File getIncomingDirectory(File storageDir) {
                final File incomingDir = new File(storageDir, "incoming");
 
-               mkdirTolerateExisting(incomingDir, "incoming");
+               mkdirTolerateExisting(incomingDir);
 
                return incomingDir;
        }
@@ -185,15 +185,13 @@ public class BlobUtils {
         *
         * @param dir
         *              directory to create
-        * @param dirType
-        *              the type of the directory (included in error message if 
something fails)
         */
-       private static void mkdirTolerateExisting(final File dir, final String 
dirType) {
+       private static void mkdirTolerateExisting(final File dir) {
                // note: thread-safe create should try to mkdir first and then 
ignore the case that the
                //       directory already existed
                if (!dir.mkdirs() && !dir.exists()) {
                        throw new RuntimeException(
-                               "Cannot create " + dirType + " directory '" + 
dir.getAbsolutePath() + "'.");
+                               "Cannot create directory '" + 
dir.getAbsolutePath() + "'.");
                }
        }
 
@@ -210,10 +208,10 @@ public class BlobUtils {
         * @return the (designated) physical storage location of the BLOB
         */
        static File getStorageLocation(
-                       @Nonnull File storageDir, @Nullable JobID jobId, 
@Nonnull BlobKey key) {
+                       File storageDir, @Nullable JobID jobId, BlobKey key) {
                File file = new 
File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
 
-               mkdirTolerateExisting(file.getParentFile(), "cache");
+               mkdirTolerateExisting(file.getParentFile());
 
                return file;
        }
@@ -229,7 +227,7 @@ public class BlobUtils {
         *
         * @return the storage directory for BLOBs belonging to the job with 
the given ID
         */
-       static String getStorageLocationPath(@Nonnull String storageDir, 
@Nullable JobID jobId) {
+       static String getStorageLocationPath(String storageDir, @Nullable JobID 
jobId) {
                if (jobId == null) {
                        // format: $base/no_job
                        return String.format("%s/%s", storageDir, 
NO_JOB_DIR_PREFIX);
@@ -256,7 +254,7 @@ public class BlobUtils {
         * @return the path to the given BLOB
         */
        static String getStorageLocationPath(
-                       @Nonnull String storageDir, @Nullable JobID jobId, 
@Nonnull BlobKey key) {
+                       String storageDir, @Nullable JobID jobId, BlobKey key) {
                if (jobId == null) {
                        // format: $base/no_job/blob_$key
                        return String.format("%s/%s/%s%s",
@@ -273,7 +271,6 @@ public class BlobUtils {
         *
         * @return a new instance of the message digest to use for the BLOB key 
computation
         */
-       @Nonnull
        static MessageDigest createMessageDigest() {
                try {
                        return MessageDigest.getInstance(HASHING_ALGORITHM);
@@ -285,7 +282,7 @@ public class BlobUtils {
        /**
         * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
         */
-       static Thread addShutdownHook(final BlobService service, final Logger 
logger) {
+       static Thread addShutdownHook(final Closeable service, final Logger 
logger) {
                checkNotNull(service);
                checkNotNull(logger);
 
@@ -399,9 +396,7 @@ public class BlobUtils {
                        try {
                                socket.close();
                        } catch (Throwable t) {
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Error while closing resource 
after BLOB transfer.", t);
-                               }
+                               LOG.debug("Exception while closing BLOB server 
connection socket.", t);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 9cc6210..425461c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -234,8 +234,7 @@ public class JobClient {
                        int pos = 0;
                        for (BlobKey blobKey : props.requiredJarFiles()) {
                                try {
-                                       // TODO: make use of job-related BLOBs 
after adapting the BlobLibraryCacheManager
-                                       allURLs[pos++] = 
blobClient.getFile(blobKey).toURI().toURL();
+                                       allURLs[pos++] = 
blobClient.getFile(jobID, blobKey).toURI().toURL();
                                } catch (Exception e) {
                                        try {
                                                blobClient.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 9fc1fc4..bb0b3e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -231,7 +230,7 @@ public abstract class Dispatcher extends RpcEndpoint 
implements DispatcherGatewa
                Configuration configuration,
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
-               BlobService blobService,
+               BlobServer blobServer,
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry,
                OnCompletionActions onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 54d698e..dfd6a8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -65,7 +64,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        Configuration configuration,
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
-                       BlobService blobService,
+                       BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
                        OnCompletionActions onCompleteActions,
@@ -77,7 +76,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        configuration,
                        rpcService,
                        highAvailabilityServices,
-                       blobService,
+                       blobServer,
                        heartbeatServices,
                        metricRegistry,
                        onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 8728186..a7c6120 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -90,7 +89,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        ResourceID resourceId,
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
-                       BlobService blobService,
+                       BlobServer blobService,
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 9aff6f9..c8fc4e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -23,9 +23,12 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Arrays;
@@ -33,72 +36,52 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * For each job graph that is submitted to the system the library cache 
manager maintains
- * a set of libraries (typically JAR files) which the job requires to run. The 
library cache manager
- * caches library files in order to avoid unnecessary retransmission of data. 
It is based on a singleton
- * programming pattern, so there exists at most one library manager at a time.
- * <p>
- * All files registered via {@link #registerJob(JobID, Collection, 
Collection)} are reference-counted
- * and are removed by a timer-based cleanup task if their reference counter is 
zero.
+ * Provides facilities to download a set of libraries (typically JAR files) 
for a job from a
+ * {@link BlobService} and create a class loader with references to them.
  */
-public final class BlobLibraryCacheManager extends TimerTask implements 
LibraryCacheManager {
+public class BlobLibraryCacheManager implements LibraryCacheManager {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BlobLibraryCacheManager.class);
+
+       private static final ExecutionAttemptID JOB_ATTEMPT_ID = new 
ExecutionAttemptID(-1, -1);
 
-       private static Logger LOG = 
LoggerFactory.getLogger(BlobLibraryCacheManager.class);
-       
-       private static ExecutionAttemptID JOB_ATTEMPT_ID = new 
ExecutionAttemptID(-1, -1);
-       
        // 
--------------------------------------------------------------------------------------------
-       
+
        /** The global lock to synchronize operations */
        private final Object lockObject = new Object();
 
        /** Registered entries per job */
-       private final Map<JobID, LibraryCacheEntry> cacheEntries = new 
HashMap<JobID, LibraryCacheEntry>();
-       
-       /** Map to store the number of reference to a specific file */
-       private final Map<BlobKey, Integer> blobKeyReferenceCounters = new 
HashMap<BlobKey, Integer>();
+       private final Map<JobID, LibraryCacheEntry> cacheEntries = new 
HashMap<>();
 
        /** The blob service to download libraries */
        private final BlobService blobService;
-       
-       private final Timer cleanupTimer;
-       
+
        // 
--------------------------------------------------------------------------------------------
 
-       /**
-        * Creates the blob library cache manager.
-        *
-        * @param blobService blob file retrieval service to use
-        * @param cleanupInterval cleanup interval in milliseconds
-        */
-       public BlobLibraryCacheManager(BlobService blobService, long 
cleanupInterval) {
+       public BlobLibraryCacheManager(BlobService blobService) {
                this.blobService = checkNotNull(blobService);
-
-               // Initializing the clean up task
-               this.cleanupTimer = new Timer(true);
-               this.cleanupTimer.schedule(this, cleanupInterval, 
cleanupInterval);
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       
        @Override
        public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, 
Collection<URL> requiredClasspaths)
-                       throws IOException {
+               throws IOException {
                registerTask(id, JOB_ATTEMPT_ID, requiredJarFiles, 
requiredClasspaths);
        }
-       
+
        @Override
-       public void registerTask(JobID jobId, ExecutionAttemptID task, 
Collection<BlobKey> requiredJarFiles,
-                       Collection<URL> requiredClasspaths) throws IOException {
+       public void registerTask(
+               JobID jobId,
+               ExecutionAttemptID task,
+               @Nullable Collection<BlobKey> requiredJarFiles,
+               @Nullable Collection<URL> requiredClasspaths) throws 
IOException {
+
                checkNotNull(jobId, "The JobId must not be null.");
                checkNotNull(task, "The task execution id must not be null.");
 
@@ -113,43 +96,31 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                        LibraryCacheEntry entry = cacheEntries.get(jobId);
 
                        if (entry == null) {
-                               // create a new entry in the library cache
-                               BlobKey[] keys = requiredJarFiles.toArray(new 
BlobKey[requiredJarFiles.size()]);
-                               URL[] urls = new URL[keys.length + 
requiredClasspaths.size()];
-
+                               URL[] urls = new URL[requiredJarFiles.size() + 
requiredClasspaths.size()];
                                int count = 0;
                                try {
-                                       for (; count < keys.length; count++) {
-                                               BlobKey blobKey = keys[count];
-                                               urls[count] = 
registerReferenceToBlobKeyAndGetURL(blobKey);
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       // undo the reference count increases
-                                       try {
-                                               for (int i = 0; i < count; i++) 
{
-                                                       
unregisterReferenceToBlobKey(keys[i]);
-                                               }
+                                       // add URLs to locally cached JAR files
+                                       for (BlobKey key : requiredJarFiles) {
+                                               urls[count] = 
blobService.getFile(jobId, key).toURI().toURL();
+                                               ++count;
                                        }
-                                       catch (Throwable tt) {
-                                               LOG.error("Error while updating 
library reference counters.", tt);
+
+                                       // add classpaths
+                                       for (URL url : requiredClasspaths) {
+                                               urls[count] = url;
+                                               ++count;
                                        }
 
+                                       cacheEntries.put(jobId, new 
LibraryCacheEntry(
+                                               requiredJarFiles, 
requiredClasspaths, urls, task));
+                               } catch (Throwable t) {
                                        // rethrow or wrap
                                        ExceptionUtils.tryRethrowIOException(t);
-                                       throw new IOException("Library cache 
could not register the user code libraries.", t);
+                                       throw new IOException(
+                                               "Library cache could not 
register the user code libraries.", t);
                                }
-
-                               // add classpaths
-                               for (URL url : requiredClasspaths) {
-                                       urls[count] = url;
-                                       count++;
-                               }
-
-                               cacheEntries.put(jobId, new 
LibraryCacheEntry(requiredJarFiles, urls, task));
-                       }
-                       else {
-                               entry.register(task, requiredJarFiles);
+                       } else {
+                               entry.register(task, requiredJarFiles, 
requiredClasspaths);
                        }
                }
        }
@@ -158,7 +129,7 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
        public void unregisterJob(JobID id) {
                unregisterTask(id, JOB_ATTEMPT_ID);
        }
-       
+
        @Override
        public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
                checkNotNull(jobId, "The JobId must not be null.");
@@ -172,162 +143,167 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                                        cacheEntries.remove(jobId);
 
                                        entry.releaseClassLoader();
-
-                                       for (BlobKey key : 
entry.getLibraries()) {
-                                               
unregisterReferenceToBlobKey(key);
-                                       }
                                }
                        }
                        // else has already been unregistered
                }
        }
-
+       
        @Override
-       public ClassLoader getClassLoader(JobID id) {
-               if (id == null) {
-                       throw new IllegalArgumentException("The JobId must not 
be null.");
-               }
-               
+       public ClassLoader getClassLoader(JobID jobId) {
+               checkNotNull(jobId, "The JobId must not be null.");
+
                synchronized (lockObject) {
-                       LibraryCacheEntry entry = cacheEntries.get(id);
-                       if (entry != null) {
-                               return entry.getClassLoader();
-                       } else {
-                               throw new IllegalStateException("No libraries 
are registered for job " + id);
+                       LibraryCacheEntry entry = cacheEntries.get(jobId);
+                       if (entry == null) {
+                               throw new IllegalStateException("No libraries 
are registered for job " + jobId);
                        }
+                       return entry.getClassLoader();
                }
        }
 
-       public int getBlobServerPort() {
-               return blobService.getPort();
-       }
-
-       @Override
-       public void shutdown() throws IOException{
-               try {
-                       run();
-               } catch (Throwable t) {
-                       LOG.warn("Failed to run clean up task before shutdown", 
t);
-               }
-
-               blobService.close();
-               cleanupTimer.cancel();
-       }
-       
        /**
-        * Cleans up blobs which are not referenced anymore
+        * Gets the number of tasks holding {@link ClassLoader} references for 
the given job.
+        *
+        * @param jobId ID of a job
+        *
+        * @return number of reference holders
         */
-       @Override
-       public void run() {
-               synchronized (lockObject) {
-                       Iterator<Map.Entry<BlobKey, Integer>> entryIter = 
blobKeyReferenceCounters.entrySet().iterator();
-                       
-                       while (entryIter.hasNext()) {
-                               Map.Entry<BlobKey, Integer> entry = 
entryIter.next();
-                               BlobKey key = entry.getKey();
-                               int references = entry.getValue();
-                               
-                               try {
-                                       if (references <= 0) {
-                                               blobService.delete(key);
-                                               entryIter.remove();
-                                       }
-                               } catch (Throwable t) {
-                                       LOG.warn("Could not delete file with 
blob key" + key, t);
-                               }
-                       }
-               }
-       }
-       
-       public int getNumberOfReferenceHolders(JobID jobId) {
+       int getNumberOfReferenceHolders(JobID jobId) {
                synchronized (lockObject) {
                        LibraryCacheEntry entry = cacheEntries.get(jobId);
                        return entry == null ? 0 : 
entry.getNumberOfReferenceHolders();
                }
        }
-       
-       int getNumberOfCachedLibraries() {
-               return blobKeyReferenceCounters.size();
-       }
-       
-       private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws 
IOException {
-               // it is important that we fetch the URL before increasing the 
counter.
-               // in case the URL cannot be created (failed to fetch the 
BLOB), we have no stale counter
-               try {
-                       URL url = blobService.getFile(key).toURI().toURL();
 
-                       Integer references = blobKeyReferenceCounters.get(key);
-                       int newReferences = references == null ? 1 : references 
+ 1;
-                       blobKeyReferenceCounters.put(key, newReferences);
-
-                       return url;
-               }
-               catch (IOException e) {
-                       throw new IOException("Cannot get library with hash " + 
key, e);
-               }
+       /**
+        * Returns the number of registered jobs that this library cache 
manager handles.
+        *
+        * @return number of jobs (irrespective of the actual number of tasks 
per job)
+        */
+       int getNumberOfManagedJobs() {
+               // no synchronisation necessary
+               return cacheEntries.size();
        }
-       
-       private void unregisterReferenceToBlobKey(BlobKey key) {
-               Integer references = blobKeyReferenceCounters.get(key);
-               if (references != null) {
-                       int newReferences = Math.max(references - 1, 0);
-                       blobKeyReferenceCounters.put(key, newReferences);
-               }
-               else {
-                       // make sure we have an entry in any case, that the 
cleanup timer removes any
-                       // present libraries
-                       blobKeyReferenceCounters.put(key, 0);
+
+       @Override
+       public void shutdown() {
+               synchronized (lockObject) {
+                       for (LibraryCacheEntry entry : cacheEntries.values()) {
+                               entry.releaseClassLoader();
+                       }
                }
        }
 
-
        // 
--------------------------------------------------------------------------------------------
 
        /**
         * An entry in the per-job library cache. Tracks which execution 
attempts
         * still reference the libraries. Once none reference it any more, the
-        * libraries can be cleaned up.
+        * class loaders can be cleaned up.
         */
        private static class LibraryCacheEntry {
-               
+
                private final FlinkUserCodeClassLoader classLoader;
-               
+
                private final Set<ExecutionAttemptID> referenceHolders;
-               
+               /**
+                * Set of BLOB keys used for a previous job/task registration.
+                *
+                * <p>The purpose of this is to make sure, future registrations 
do not differ in content as
+                * this is a contract of the {@link BlobLibraryCacheManager}.
+                */
                private final Set<BlobKey> libraries;
-               
-               
-               public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] 
libraryURLs, ExecutionAttemptID initialReference) {
+
+               /**
+                * Set of class path URLs used for a previous job/task 
registration.
+                *
+                * <p>The purpose of this is to make sure, future registrations 
do not differ in content as
+                * this is a contract of the {@link BlobLibraryCacheManager}.
+                */
+               private final Set<String> classPaths;
+
+               /**
+                * Creates a cache entry for a flink class loader with the 
given <tt>libraryURLs</tt>.
+                *
+                * @param requiredLibraries
+                *              BLOB keys required by the class loader (stored 
for ensuring consistency among different
+                *              job/task registrations)
+                * @param requiredClasspaths
+                *              class paths required by the class loader 
(stored for ensuring consistency among
+                *              different job/task registrations)
+                * @param libraryURLs
+                *              complete list of URLs to use for the class 
loader (includes references to the
+                *              <tt>requiredLibraries</tt> and 
<tt>requiredClasspaths</tt>)
+                * @param initialReference
+                *              reference holder ID
+                */
+               LibraryCacheEntry(
+                               Collection<BlobKey> requiredLibraries,
+                               Collection<URL> requiredClasspaths,
+                               URL[] libraryURLs,
+                               ExecutionAttemptID initialReference) {
+
                        this.classLoader = new 
FlinkUserCodeClassLoader(libraryURLs);
-                       this.libraries = new HashSet<>(libraries);
+                       // NOTE: do not store the class paths, i.e. URLs, into 
a set for performance reasons
+                       //       see 
http://findbugs.sourceforge.net/bugDescriptions.html#DMI_COLLECTION_OF_URLS
+                       //       -> alternatively, compare their string 
representation
+                       this.classPaths = new 
HashSet<>(requiredClasspaths.size());
+                       for (URL url : requiredClasspaths) {
+                               classPaths.add(url.toString());
+                       }
+                       this.libraries = new HashSet<>(requiredLibraries);
                        this.referenceHolders = new HashSet<>();
                        this.referenceHolders.add(initialReference);
                }
-               
-               
+
                public ClassLoader getClassLoader() {
                        return classLoader;
                }
-               
+
                public Set<BlobKey> getLibraries() {
                        return libraries;
                }
-               
-               public void register(ExecutionAttemptID task, 
Collection<BlobKey> keys) {
-                       if (!libraries.containsAll(keys)) {
+
+               public void register(
+                               ExecutionAttemptID task, Collection<BlobKey> 
requiredLibraries,
+                               Collection<URL> requiredClasspaths) {
+
+                       // Make sure the previous registration referred to the 
same libraries and class paths.
+                       // NOTE: the original collections may contain 
duplicates and may not already be Set
+                       //       collections with fast checks whether an item 
is contained in it.
+
+                       // lazy construction of a new set for faster comparisons
+                       if (libraries.size() != requiredLibraries.size() ||
+                               !new 
HashSet<>(requiredLibraries).containsAll(libraries)) {
+
                                throw new IllegalStateException(
-                                               "The library registration 
references a different set of libraries than previous registrations for this 
job.");
+                                       "The library registration references a 
different set of library BLOBs than" +
+                                               " previous registrations for 
this job:\nold:" + libraries.toString() +
+                                               "\nnew:" + 
requiredLibraries.toString());
                        }
-                       
+
+                       // lazy construction of a new set with String 
representations of the URLs
+                       if (classPaths.size() != requiredClasspaths.size() ||
+                               
!requiredClasspaths.stream().map(URL::toString).collect(Collectors.toSet())
+                                       .containsAll(classPaths)) {
+
+                               throw new IllegalStateException(
+                                       "The library registration references a 
different set of library BLOBs than" +
+                                               " previous registrations for 
this job:\nold:" +
+                                               classPaths.toString() +
+                                               "\nnew:" + 
requiredClasspaths.toString());
+                       }
+
                        this.referenceHolders.add(task);
                }
-               
+
                public boolean unregister(ExecutionAttemptID task) {
                        referenceHolders.remove(task);
                        return referenceHolders.isEmpty();
                }
-               
-               public int getNumberOfReferenceHolders() {
+
+               int getNumberOfReferenceHolders() {
                        return referenceHolders.size();
                }
 
@@ -343,5 +319,4 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                        }
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
index 8e14e58..41eeb18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FallbackLibraryCacheManager.java
@@ -28,7 +28,7 @@ import java.net.URL;
 import java.util.Collection;
 
 public class FallbackLibraryCacheManager implements LibraryCacheManager {
-       
+
        private static Logger LOG = 
LoggerFactory.getLogger(FallbackLibraryCacheManager.class);
 
        @Override
@@ -40,10 +40,10 @@ public class FallbackLibraryCacheManager implements 
LibraryCacheManager {
        public void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, 
Collection<URL> requiredClasspaths) {
                LOG.warn("FallbackLibraryCacheManager cannot download files 
associated with blob keys.");
        }
-       
+
        @Override
        public void registerTask(JobID id, ExecutionAttemptID execution, 
Collection<BlobKey> requiredJarFiles,
-                       Collection<URL> requiredClasspaths) {
+               Collection<URL> requiredClasspaths) {
                LOG.warn("FallbackLibraryCacheManager cannot download files 
associated with blob keys.");
        }
 
@@ -51,7 +51,7 @@ public class FallbackLibraryCacheManager implements 
LibraryCacheManager {
        public void unregisterJob(JobID id) {
                LOG.warn("FallbackLibraryCacheManager does not book keeping of 
job IDs.");
        }
-       
+
        @Override
        public void unregisterTask(JobID id, ExecutionAttemptID execution) {
                LOG.warn("FallbackLibraryCacheManager does not book keeping of 
job IDs.");

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
index 5f9f443..93c6efd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/LibraryCacheManager.java
@@ -19,14 +19,15 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
 
 public interface LibraryCacheManager {
+
        /**
         * Returns the user code class loader associated with id.
         *
@@ -36,30 +37,34 @@ public interface LibraryCacheManager {
        ClassLoader getClassLoader(JobID id);
 
        /**
-        * Registers a job with its required jar files and classpaths. The jar 
files are identified by their blob keys.
+        * Registers a job with its required jar files and classpaths. The jar 
files are identified by
+        * their blob keys and downloaded for use by a {@link ClassLoader}.
         *
         * @param id job ID
         * @param requiredJarFiles collection of blob keys identifying the 
required jar files
         * @param requiredClasspaths collection of classpaths that are added to 
the user code class loader
+        *
         * @throws IOException if any error occurs when retrieving the required 
jar files
         *
         * @see #unregisterJob(JobID) counterpart of this method
         */
        void registerJob(JobID id, Collection<BlobKey> requiredJarFiles, 
Collection<URL> requiredClasspaths)
-                       throws IOException;
-       
+               throws IOException;
+
        /**
-        * Registers a job task execution with its required jar files and 
classpaths. The jar files are identified by their blob keys.
+        * Registers a job task execution with its required jar files and 
classpaths. The jar files are
+        * identified by their blob keys and downloaded for use by a {@link 
ClassLoader}.
         *
         * @param id job ID
         * @param requiredJarFiles collection of blob keys identifying the 
required jar files
         * @param requiredClasspaths collection of classpaths that are added to 
the user code class loader
-        * @throws IOException
+        *
+        * @throws IOException if any error occurs when retrieving the required 
jar files
         *
         * @see #unregisterTask(JobID, ExecutionAttemptID) counterpart of this 
method
         */
        void registerTask(JobID id, ExecutionAttemptID execution, 
Collection<BlobKey> requiredJarFiles,
-                       Collection<URL> requiredClasspaths) throws IOException;
+               Collection<URL> requiredClasspaths) throws IOException;
 
        /**
         * Unregisters a job task execution from the library cache manager.
@@ -88,9 +93,7 @@ public interface LibraryCacheManager {
        void unregisterJob(JobID id);
 
        /**
-        * Shutdown method
-        *
-        * @throws IOException
+        * Shutdown method which may release created class loaders.
         */
-       void shutdown() throws IOException;
+       void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 6b92d79..c126875 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -536,7 +536,7 @@ public class JobGraph implements Serializable {
                        Configuration blobClientConfig) throws IOException {
                if (!userJars.isEmpty()) {
                        // TODO: make use of job-related BLOBs after adapting 
the BlobLibraryCacheManager
-                       List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, userJars);
+                       List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(blobServerAddress, blobClientConfig, jobID, userJars);
 
                        for (BlobKey blobKey : blobKeys) {
                                if (!userJarBlobKeys.contains(blobKey)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 5838cf2..c312cd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -93,7 +93,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final Configuration configuration,
                        final RpcService rpcService,
                        final HighAvailabilityServices haServices,
-                       final BlobService blobService,
+                       final BlobServer blobService,
                        final HeartbeatServices heartbeatServices,
                        final OnCompletionActions toNotifyOnComplete,
                        final FatalErrorHandler errorHandler) throws Exception {
@@ -116,7 +116,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final Configuration configuration,
                        final RpcService rpcService,
                        final HighAvailabilityServices haServices,
-                       final BlobService blobService,
+                       final BlobServer blobService,
                        final HeartbeatServices heartbeatServices,
                        final MetricRegistry metricRegistry,
                        final OnCompletionActions toNotifyOnComplete,
@@ -199,6 +199,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                                haServices,
                                heartbeatServices,
                                jobManagerServices.executorService,
+                               jobManagerServices.blobServer,
                                jobManagerServices.libraryCacheManager,
                                jobManagerServices.restartStrategyFactory,
                                jobManagerServices.rpcAskTimeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e14f5af..57aeaff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.blob.BlobService;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -45,6 +44,7 @@ public class JobManagerServices {
 
        public final ScheduledExecutorService executorService;
 
+       public final BlobServer blobServer;
        public final BlobLibraryCacheManager libraryCacheManager;
 
        public final RestartStrategyFactory restartStrategyFactory;
@@ -53,11 +53,13 @@ public class JobManagerServices {
 
        public JobManagerServices(
                        ScheduledExecutorService executorService,
+                       BlobServer blobServer,
                        BlobLibraryCacheManager libraryCacheManager,
                        RestartStrategyFactory restartStrategyFactory,
                        Time rpcAskTimeout) {
 
                this.executorService = checkNotNull(executorService);
+               this.blobServer = checkNotNull(blobServer);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
                this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
                this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
@@ -80,8 +82,9 @@ public class JobManagerServices {
                        firstException = t;
                }
 
+               libraryCacheManager.shutdown();
                try {
-                       libraryCacheManager.shutdown();
+                       blobServer.close();
                }
                catch (Throwable t) {
                        if (firstException == null) {
@@ -103,16 +106,12 @@ public class JobManagerServices {
 
        public static JobManagerServices fromConfiguration(
                        Configuration config,
-                       BlobService blobService) throws Exception {
+                       BlobServer blobServer) throws Exception {
 
                Preconditions.checkNotNull(config);
-               Preconditions.checkNotNull(blobService);
+               Preconditions.checkNotNull(blobServer);
 
-               final long cleanupInterval = config.getLong(
-                       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-                       
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
-
-               final BlobLibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(blobService, cleanupInterval);
+               final BlobLibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(blobServer);
 
                final FiniteDuration timeout;
                try {
@@ -127,6 +126,7 @@ public class JobManagerServices {
 
                return new JobManagerServices(
                        futureExecutor,
+                       blobServer,
                        libraryCacheManager,
                        
RestartStrategyFactory.createRestartStrategyFactory(config),
                        Time.of(timeout.length(), timeout.unit()));

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index d6019db..a8a8632 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -149,7 +150,10 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        /** Service to contend for and retrieve the leadership of JM and RM */
        private final HighAvailabilityServices highAvailabilityServices;
 
-       /** Blob cache manager used across jobs */
+       /** Blob server used across jobs */
+       private final BlobServer blobServer;
+
+       /** Blob library cache manager used across jobs */
        private final BlobLibraryCacheManager libraryCacheManager;
 
        /** The metrics for the JobManager itself */
@@ -204,6 +208,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                        HighAvailabilityServices highAvailabilityService,
                        HeartbeatServices heartbeatServices,
                        ScheduledExecutorService executor,
+                       BlobServer blobServer,
                        BlobLibraryCacheManager libraryCacheManager,
                        RestartStrategyFactory restartStrategyFactory,
                        Time rpcAskTimeout,
@@ -221,6 +226,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                this.configuration = checkNotNull(configuration);
                this.rpcTimeout = rpcAskTimeout;
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityService);
+               this.blobServer = checkNotNull(blobServer);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
                this.executor = checkNotNull(executor);
                this.jobCompletionActions = checkNotNull(jobCompletionActions);
@@ -698,7 +704,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        @Override
        public CompletableFuture<ClassloadingProps> requestClassloadingProps() {
                return CompletableFuture.completedFuture(
-                       new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+                       new ClassloadingProps(blobServer.getPort(),
                                executionGraph.getRequiredJarFiles(),
                                executionGraph.getRequiredClasspaths()));
        }
@@ -785,7 +791,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
 
                if (registeredTaskManagers.containsKey(taskManagerId)) {
                        final RegistrationResponse response = new 
JMTMRegistrationSuccess(
-                               resourceId, 
libraryCacheManager.getBlobServerPort());
+                               resourceId, blobServer.getPort());
                        return CompletableFuture.completedFuture(response);
                } else {
                        return getRpcService()
@@ -819,7 +825,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                                                        }
                                                });
 
-                                               return new 
JMTMRegistrationSuccess(resourceId, libraryCacheManager.getBlobServerPort());
+                                               return new 
JMTMRegistrationSuccess(resourceId, blobServer.getPort());
                                        },
                                        getMainThreadExecutor());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 98c7bf1..363c107 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
@@ -53,6 +54,9 @@ public class JobManagerConnection {
        // Checkpoint responder for the specific job manager
        private final CheckpointResponder checkpointResponder;
 
+       // BLOB cache connected to the BLOB server at the specific job manager
+       private final BlobCache blobCache;
+
        // Library cache manager connected to the specific job manager
        private final LibraryCacheManager libraryCacheManager;
 
@@ -63,21 +67,22 @@ public class JobManagerConnection {
        private final PartitionProducerStateChecker partitionStateChecker;
 
        public JobManagerConnection(
-               JobID jobID,
-               ResourceID resourceID,
-               JobMasterGateway jobMasterGateway,
-               UUID leaderId,
-               TaskManagerActions taskManagerActions,
-               CheckpointResponder checkpointResponder,
-               LibraryCacheManager libraryCacheManager,
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
-               PartitionProducerStateChecker partitionStateChecker) {
+                               JobID jobID,
+                               ResourceID resourceID,
+                               JobMasterGateway jobMasterGateway,
+                               UUID leaderId,
+                               TaskManagerActions taskManagerActions,
+                               CheckpointResponder checkpointResponder,
+                               BlobCache blobCache, LibraryCacheManager 
libraryCacheManager,
+                               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
+                               PartitionProducerStateChecker 
partitionStateChecker) {
                this.jobID = Preconditions.checkNotNull(jobID);
                this.resourceID = Preconditions.checkNotNull(resourceID);
                this.leaderId = Preconditions.checkNotNull(leaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                this.taskManagerActions = 
Preconditions.checkNotNull(taskManagerActions);
                this.checkpointResponder = 
Preconditions.checkNotNull(checkpointResponder);
+               this.blobCache = Preconditions.checkNotNull(blobCache);
                this.libraryCacheManager = 
Preconditions.checkNotNull(libraryCacheManager);
                this.resultPartitionConsumableNotifier = 
Preconditions.checkNotNull(resultPartitionConsumableNotifier);
                this.partitionStateChecker = 
Preconditions.checkNotNull(partitionStateChecker);
@@ -111,6 +116,15 @@ public class JobManagerConnection {
                return libraryCacheManager;
        }
 
+       /**
+        * Gets the BLOB cache connected to the respective BLOB server instance 
at the job manager.
+        *
+        * @return BLOB cache
+        */
+       public BlobCache getBlobCache() {
+               return blobCache;
+       }
+
        public ResultPartitionConsumableNotifier 
getResultPartitionConsumableNotifier() {
                return resultPartitionConsumableNotifier;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4abcdf4..a5ce84b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -352,6 +352,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                        TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
                        CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();
+                       BlobCache blobCache = 
jobManagerConnection.getBlobCache();
                        LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
                        ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
                        PartitionProducerStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
@@ -374,6 +375,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                taskManagerActions,
                                inputSplitProvider,
                                checkpointResponder,
+                               blobCache,
                                libraryCache,
                                fileCache,
                                taskManagerConfiguration,
@@ -935,14 +937,13 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                InetSocketAddress blobServerAddress = new 
InetSocketAddress(jobMasterGateway.getHostname(), blobPort);
 
                final LibraryCacheManager libraryCacheManager;
+               final BlobCache blobCache;
                try {
-                       final BlobCache blobCache = new BlobCache(
+                       blobCache = new BlobCache(
                                blobServerAddress,
                                taskManagerConfiguration.getConfiguration(),
                                haServices.createBlobStore());
-                       libraryCacheManager = new BlobLibraryCacheManager(
-                               blobCache,
-                               taskManagerConfiguration.getCleanupInterval());
+                       libraryCacheManager = new 
BlobLibraryCacheManager(blobCache);
                } catch (IOException e) {
                        // Can't pass the IOException up - we need a 
RuntimeException anyway
                        // two levels up where this is run asynchronously. 
Also, we don't
@@ -967,6 +968,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        jobManagerLeaderId,
                        taskManagerActions,
                        checkpointResponder,
+                       blobCache,
                        libraryCacheManager,
                        resultPartitionConsumableNotifier,
                        partitionStateChecker);
@@ -977,6 +979,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                JobMasterGateway jobManagerGateway = 
jobManagerConnection.getJobManagerGateway();
                jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
                jobManagerConnection.getLibraryCacheManager().shutdown();
+               jobManagerConnection.getBlobCache().close();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index ea9f576..7c7693b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -53,8 +54,6 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
        private final Time maxRegistrationPause;
        private final Time refusedRegistrationPause;
 
-       private final long cleanupInterval;
-
        private final UnmodifiableConfiguration configuration;
 
        private final boolean exitJvmOnOutOfMemory;
@@ -78,7 +77,6 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                this.initialRegistrationPause = 
Preconditions.checkNotNull(initialRegistrationPause);
                this.maxRegistrationPause = 
Preconditions.checkNotNull(maxRegistrationPause);
                this.refusedRegistrationPause = 
Preconditions.checkNotNull(refusedRegistrationPause);
-               this.cleanupInterval = 
Preconditions.checkNotNull(cleanupInterval);
                this.configuration = new 
UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
                this.exitJvmOnOutOfMemory = exitJvmOnOutOfMemory;
        }
@@ -107,10 +105,6 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                return refusedRegistrationPause;
        }
 
-       public long getCleanupInterval() {
-               return cleanupInterval;
-       }
-
        @Override
        public Configuration getConfiguration() {
                return configuration;
@@ -153,9 +147,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
 
                LOG.info("Messages have a max timeout of " + timeout);
 
-               final long cleanupInterval = configuration.getLong(
-                       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-                       
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+               final long cleanupInterval = 
configuration.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
 
                final Time finiteRegistrationDuration;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 04cb990..d628960 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -203,7 +204,10 @@ public class Task implements Runnable, TaskActions {
        /** All listener that want to be notified about changes in the task's 
execution state */
        private final List<TaskExecutionStateListener> 
taskExecutionStateListeners;
 
-       /** The library cache, from which the task can request its required JAR 
files */
+       /** The BLOB cache, from which the task can request BLOB files */
+       private final BlobCache blobCache;
+
+       /** The library cache, from which the task can request its class loader 
*/
        private final LibraryCacheManager libraryCache;
 
        /** The cache for user-defined files that the invokable requires */
@@ -282,6 +286,7 @@ public class Task implements Runnable, TaskActions {
                TaskManagerActions taskManagerActions,
                InputSplitProvider inputSplitProvider,
                CheckpointResponder checkpointResponder,
+               BlobCache blobCache,
                LibraryCacheManager libraryCache,
                FileCache fileCache,
                TaskManagerRuntimeInfo taskManagerConfig,
@@ -330,6 +335,7 @@ public class Task implements Runnable, TaskActions {
                this.checkpointResponder = 
Preconditions.checkNotNull(checkpointResponder);
                this.taskManagerActions = checkNotNull(taskManagerActions);
 
+               this.blobCache = Preconditions.checkNotNull(blobCache);
                this.libraryCache = Preconditions.checkNotNull(libraryCache);
                this.fileCache = Preconditions.checkNotNull(fileCache);
                this.network = Preconditions.checkNotNull(networkEnvironment);
@@ -568,6 +574,8 @@ public class Task implements Runnable, TaskActions {
                        LOG.info("Creating FileSystem stream leak safety net 
for task {}", this);
                        FileSystemSafetyNet.initializeSafetyNetForThread();
 
+                       blobCache.registerJob(jobId);
+
                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
                        LOG.info("Loading JAR files for task {}.", this);
@@ -827,6 +835,7 @@ public class Task implements Runnable, TaskActions {
 
                                // remove all of the tasks library resources
                                libraryCache.unregisterTask(jobId, executionId);
+                               blobCache.releaseJob(jobId);
 
                                // remove all files in the distributed cache
                                removeCachedFiles(distributedCacheEntries, 
fileCache);
@@ -862,7 +871,7 @@ public class Task implements Runnable, TaskActions {
                // triggers the download of all missing jar files from the job 
manager
                libraryCache.registerTask(jobId, executionId, requiredJarFiles, 
requiredClasspaths);
 
-               LOG.debug("Register task {} at library cache manager took {} 
milliseconds",
+               LOG.debug("Getting user code class loader for task {} at 
library cache manager took {} milliseconds",
                                executionId, System.currentTimeMillis() - 
startDownloadTime);
 
                ClassLoader userCodeClassLoader = 
libraryCache.getClassLoader(jobId);

Reply via email to