http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index cd7b363..61c61b4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.clusterframework
 
-import java.util.concurrent.{ScheduledExecutorService, Executor}
+import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -51,6 +52,7 @@ import scala.language.postfixOps
   * @param instanceManager Instance manager to manage the registered
   *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
   * @param scheduler Scheduler to schedule Flink jobs
+  * @param blobServer Server instance to store BLOBs for the individual tasks
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
   * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
@@ -63,6 +65,7 @@ abstract class ContaineredJobManager(
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -78,6 +81,7 @@ abstract class ContaineredJobManager(
     ioExecutor,
     instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     archive,
     restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1616a7b..8c551a7 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -126,6 +126,7 @@ class JobManager(
     protected val ioExecutor: Executor,
     protected val instanceManager: InstanceManager,
     protected val scheduler: FlinkScheduler,
+    protected val blobServer: BlobServer,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
     protected val restartStrategyFactory: RestartStrategyFactory,
@@ -272,11 +273,12 @@ class JobManager(
 
     instanceManager.shutdown()
     scheduler.shutdown()
+    libraryCacheManager.shutdown()
 
     try {
-      libraryCacheManager.shutdown()
+      blobServer.close()
     } catch {
-      case e: IOException => log.error("Could not properly shutdown the 
library cache manager.", e)
+      case e: IOException => log.error("Could not properly shutdown the blob 
server.", e)
     }
 
     // failsafe shutdown of the metrics registry
@@ -422,7 +424,7 @@ class JobManager(
         taskManager ! decorateMessage(
           AlreadyRegistered(
             instanceID,
-            libraryCacheManager.getBlobServerPort))
+            blobServer.getPort))
       } else {
         try {
           val actorGateway = new AkkaActorGateway(taskManager, 
leaderSessionID.orNull)
@@ -437,7 +439,7 @@ class JobManager(
           taskManagerMap.put(taskManager, instanceID)
 
           taskManager ! decorateMessage(
-            AcknowledgeRegistration(instanceID, 
libraryCacheManager.getBlobServerPort))
+            AcknowledgeRegistration(instanceID, blobServer.getPort))
 
           // to be notified when the taskManager is no longer reachable
           context.watch(taskManager)
@@ -839,6 +841,7 @@ class JobManager(
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
           //TODO user code class loader ?
+          // (has not been used so far and new savepoints can simply be 
deleted by file)
           val savepoint = SavepointStore.loadSavepoint(
             savepointPath,
             Thread.currentThread().getContextClassLoader)
@@ -1060,7 +1063,7 @@ class JobManager(
         case Some((graph, jobInfo)) =>
           sender() ! decorateMessage(
             ClassloadingProps(
-              libraryCacheManager.getBlobServerPort,
+              blobServer.getPort,
               graph.getRequiredJarFiles,
               graph.getRequiredClasspaths))
         case None =>
@@ -1068,7 +1071,7 @@ class JobManager(
       }
 
     case RequestBlobManagerPort =>
-      sender ! decorateMessage(libraryCacheManager.getBlobServerPort)
+      sender ! decorateMessage(blobServer.getPort)
 
     case RequestArchive =>
       sender ! decorateMessage(ResponseArchive(archive))
@@ -1254,8 +1257,8 @@ class JobManager(
         // because this makes sure that the uploaded jar files are removed in 
case of
         // unsuccessful
         try {
-          libraryCacheManager.registerJob(jobGraph.getJobID, 
jobGraph.getUserJarBlobKeys,
-            jobGraph.getClasspaths)
+          libraryCacheManager.registerJob(
+            jobGraph.getJobID, jobGraph.getUserJarBlobKeys, 
jobGraph.getClasspaths)
         }
         catch {
           case t: Throwable =>
@@ -1344,6 +1347,7 @@ class JobManager(
           log.error(s"Failed to submit job $jobId ($jobName)", t)
 
           libraryCacheManager.unregisterJob(jobId)
+          blobServer.cleanupJob(jobId)
           currentJobs.remove(jobId)
 
           if (executionGraph != null) {
@@ -1785,12 +1789,10 @@ class JobManager(
       case None => None
     }
 
-    try {
-      libraryCacheManager.unregisterJob(jobID)
-    } catch {
-      case t: Throwable =>
-        log.error(s"Could not properly unregister job $jobID from the library 
cache.", t)
-    }
+    // remove all job-related BLOBs from local and HA store
+    libraryCacheManager.unregisterJob(jobID)
+    blobServer.cleanupJob(jobID)
+
     jobManagerMetricGroup.foreach(_.removeJob(jobID))
 
     futureOption
@@ -2463,6 +2465,7 @@ object JobManager {
       blobStore: BlobStore) :
     (InstanceManager,
     FlinkScheduler,
+    BlobServer,
     BlobLibraryCacheManager,
     RestartStrategyFactory,
     FiniteDuration, // timeout
@@ -2474,10 +2477,6 @@ object JobManager {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
 
-    val cleanupInterval = configuration.getLong(
-      ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
-      ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
-
     val restartStrategy = 
RestartStrategyFactory.createRestartStrategyFactory(configuration)
 
     val archiveCount = configuration.getInteger(WebOptions.ARCHIVE_COUNT)
@@ -2508,21 +2507,21 @@ object JobManager {
       blobServer = new BlobServer(configuration, blobStore)
       instanceManager = new InstanceManager()
       scheduler = new 
FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
-      libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
+      libraryCacheManager = new BlobLibraryCacheManager(blobServer)
 
       instanceManager.addInstanceListener(scheduler)
     }
     catch {
       case t: Throwable =>
-        if (libraryCacheManager != null) {
-          libraryCacheManager.shutdown()
-        }
         if (scheduler != null) {
           scheduler.shutdown()
         }
         if (instanceManager != null) {
           instanceManager.shutdown()
         }
+        if (libraryCacheManager != null) {
+          libraryCacheManager.shutdown()
+        }
         if (blobServer != null) {
           blobServer.close()
         }
@@ -2554,6 +2553,7 @@ object JobManager {
 
     (instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       restartStrategy,
       timeout,
@@ -2627,6 +2627,7 @@ object JobManager {
 
     val (instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     restartStrategy,
     timeout,
@@ -2654,6 +2655,7 @@ object JobManager {
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategy,
@@ -2693,6 +2695,7 @@ object JobManager {
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
+    blobServer: BlobServer,
     libraryCacheManager: LibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
@@ -2710,6 +2713,7 @@ object JobManager {
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 0ae00a9..dcf9dd0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, 
TaskManagerOptions}
 import org.apache.flink.core.fs.Path
+import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
@@ -133,6 +134,7 @@ class LocalFlinkMiniCluster(
 
     val (instanceManager,
     scheduler,
+    blobServer,
     libraryCacheManager,
     restartStrategyFactory,
     timeout,
@@ -164,6 +166,7 @@ class LocalFlinkMiniCluster(
         ioExecutor,
         instanceManager,
         scheduler,
+        blobServer,
         libraryCacheManager,
         archive,
         restartStrategyFactory,
@@ -279,6 +282,7 @@ class LocalFlinkMiniCluster(
       ioExecutor: Executor,
       instanceManager: InstanceManager,
       scheduler: Scheduler,
+      blobServer: BlobServer,
       libraryCacheManager: BlobLibraryCacheManager,
       archive: ActorRef,
       restartStrategyFactory: RestartStrategyFactory,
@@ -297,6 +301,7 @@ class LocalFlinkMiniCluster(
       ioExecutor,
       instanceManager,
       scheduler,
+      blobServer,
       libraryCacheManager,
       archive,
       restartStrategyFactory,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 0c419eb..431adb6 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,7 +35,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, 
QuarantineMonitor}
-import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
+import org.apache.flink.runtime.blob.{BlobCache, BlobClient}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -160,7 +160,7 @@ class TaskManager(
     * registered at the job manager */
   private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
 
-  private var blobService: Option[BlobService] = None
+  private var blobCache: Option[BlobCache] = None
   private var libraryCacheManager: Option[LibraryCacheManager] = None
 
   /* The current leading JobManager Actor associated with */
@@ -333,11 +333,11 @@ class TaskManager(
       killTaskManagerFatal(message, cause)
 
     case RequestTaskManagerLog(requestType : LogTypeRequest) =>
-      blobService match {
+      blobCache match {
         case Some(_) =>
           handleRequestTaskManagerLog(sender(), requestType, 
currentJobManager.get)
         case None =>
-          sender() ! akka.actor.Status.Failure(new IOException("BlobService 
not " +
+          sender() ! akka.actor.Status.Failure(new IOException("BlobCache not 
" +
             "available. Cannot upload TaskManager logs."))
       }
 
@@ -840,7 +840,7 @@ class TaskManager(
         if (file.exists()) {
           val fis = new FileInputStream(file);
           Future {
-            val client: BlobClient = blobService.get.createClient()
+            val client: BlobClient = blobCache.get.createClient()
             client.put(fis);
           }(context.dispatcher)
             .onComplete {
@@ -915,7 +915,7 @@ class TaskManager(
       "starting network stack and library cache.")
 
     // sanity check that the JobManager dependent components are not set up 
currently
-    if (connectionUtils.isDefined || blobService.isDefined) {
+    if (connectionUtils.isDefined || blobCache.isDefined) {
       throw new IllegalStateException("JobManager-specific components are 
already initialized.")
     }
 
@@ -968,9 +968,9 @@ class TaskManager(
           address,
           config.getConfiguration(),
           highAvailabilityServices.createBlobStore())
-        blobService = Option(blobcache)
+        blobCache = Option(blobcache)
         libraryCacheManager = Some(
-          new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
+          new BlobLibraryCacheManager(blobcache))
       }
       catch {
         case e: Exception =>
@@ -1047,18 +1047,11 @@ class TaskManager(
 
     // shut down BLOB and library cache
     libraryCacheManager foreach {
-      manager =>
-        try {
-          manager.shutdown()
-        } catch {
-          case ioe: IOException => log.error(
-            "Could not properly shutdown library cache manager.",
-            ioe)
-        }
+      manager => manager.shutdown()
     }
     libraryCacheManager = None
 
-    blobService foreach {
+    blobCache foreach {
       service =>
         try {
           service.close()
@@ -1066,7 +1059,7 @@ class TaskManager(
           case ioe: IOException => log.error("Could not properly shutdown blob 
service.", ioe)
         }
     }
-    blobService = None
+    blobCache = None
 
     // disassociate the slot environment
     connectionUtils = None
@@ -1130,6 +1123,10 @@ class TaskManager(
         case Some(manager) => manager
         case None => throw new IllegalStateException("There is no valid 
library cache manager.")
       }
+      val blobCache = this.blobCache match {
+        case Some(manager) => manager
+        case None => throw new IllegalStateException("There is no valid BLOB 
cache.")
+      }
 
       val slot = tdd.getTargetSlotNumber
       if (slot < 0 || slot >= numberOfSlots) {
@@ -1200,6 +1197,7 @@ class TaskManager(
         taskManagerConnection,
         inputSplitProvider,
         checkpointResponder,
+        blobCache,
         libCache,
         fileCache,
         config,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
new file mode 100644
index 0000000..afd365b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A few tests for the deferred ref-counting based cleanup inside the {@link 
BlobCache}.
+ */
+public class BlobCacheCleanupTest extends TestLogger {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}.
+        */
+       @Test
+       public void testJobCleanup() throws IOException, InterruptedException {
+
+               JobID jobId = new JobID();
+               List<BlobKey> keys = new ArrayList<BlobKey>();
+               BlobServer server = null;
+               BlobCache cache = null;
+
+               final byte[] buf = new byte[128];
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+                       config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+                       server = new BlobServer(config, new VoidBlobStore());
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+
+                       // upload blobs
+                       try (BlobClient bc = new BlobClient(serverAddress, 
config)) {
+                               keys.add(bc.put(jobId, buf));
+                               buf[0] += 1;
+                               keys.add(bc.put(jobId, buf));
+                       }
+
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
+
+                       // register once
+                       cache.registerJob(jobId);
+
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
+
+                       for (BlobKey key : keys) {
+                               cache.getFile(jobId, key);
+                       }
+
+                       // register again (let's say, from another thread or so)
+                       cache.registerJob(jobId);
+                       for (BlobKey key : keys) {
+                               cache.getFile(jobId, key);
+                       }
+
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       // after releasing once, nothing should change
+                       cache.releaseJob(jobId);
+
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       // after releasing the second time, the job is up for 
deferred cleanup
+                       cache.releaseJob(jobId);
+
+                       // because we cannot guarantee that there are not 
thread races in the build system, we
+                       // loop for a certain while until the references 
disappear
+                       {
+                               long deadline = System.currentTimeMillis() + 
30_000L;
+                               do {
+                                       Thread.sleep(100);
+                               }
+                               while (checkFilesExist(jobId, keys, cache, 
false) != 0 &&
+                                       System.currentTimeMillis() < deadline);
+                       }
+
+                       // the blob cache should no longer contain the files
+                       // this fails if we exited via a timeout
+                       checkFileCountForJob(0, jobId, cache);
+                       // server should be unaffected
+                       checkFileCountForJob(2, jobId, server);
+               }
+               finally {
+                       if (cache != null) {
+                               cache.close();
+                       }
+
+                       if (server != null) {
+                               server.close();
+                       }
+                       // now everything should be cleaned up
+                       checkFileCountForJob(0, jobId, server);
+               }
+       }
+
+       /**
+        * Tests that {@link BlobCache} cleans up after calling {@link 
BlobCache#releaseJob(JobID)}
+        * but only after preserving the file for a bit longer.
+        */
+       @Test
+       @Ignore("manual test due to stalling: ensures a BLOB is retained first 
and only deleted after the (long) timeout ")
+       public void testJobDeferredCleanup() throws IOException, 
InterruptedException {
+               // file should be deleted between 5 and 10s after last job 
release
+               long cleanupInterval = 5L;
+
+               JobID jobId = new JobID();
+               List<BlobKey> keys = new ArrayList<BlobKey>();
+               BlobServer server = null;
+               BlobCache cache = null;
+
+               final byte[] buf = new byte[128];
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+                       config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 
cleanupInterval);
+
+                       server = new BlobServer(config, new VoidBlobStore());
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+
+                       // upload blobs
+                       try (BlobClient bc = new BlobClient(serverAddress, 
config)) {
+                               keys.add(bc.put(jobId, buf));
+                               buf[0] += 1;
+                               keys.add(bc.put(jobId, buf));
+                       }
+
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
+
+                       // register once
+                       cache.registerJob(jobId);
+
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
+
+                       for (BlobKey key : keys) {
+                               cache.getFile(jobId, key);
+                       }
+
+                       // register again (let's say, from another thread or so)
+                       cache.registerJob(jobId);
+                       for (BlobKey key : keys) {
+                               cache.getFile(jobId, key);
+                       }
+
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       // after releasing once, nothing should change
+                       cache.releaseJob(jobId);
+
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       // after releasing the second time, the job is up for 
deferred cleanup
+                       cache.releaseJob(jobId);
+
+                       // files should still be accessible for now
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, cache);
+
+                       Thread.sleep(cleanupInterval / 5);
+                       // still accessible...
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, cache);
+
+                       Thread.sleep((cleanupInterval * 4) / 5);
+
+                       // files are up for cleanup now...wait for it:
+                       // because we cannot guarantee that there are not 
thread races in the build system, we
+                       // loop for a certain while until the references 
disappear
+                       {
+                               long deadline = System.currentTimeMillis() + 
30_000L;
+                               do {
+                                       Thread.sleep(100);
+                               }
+                               while (checkFilesExist(jobId, keys, cache, 
false) != 0 &&
+                                       System.currentTimeMillis() < deadline);
+                       }
+
+                       // the blob cache should no longer contain the files
+                       // this fails if we exited via a timeout
+                       checkFileCountForJob(0, jobId, cache);
+                       // server should be unaffected
+                       checkFileCountForJob(2, jobId, server);
+               }
+               finally {
+                       if (cache != null) {
+                               cache.close();
+                       }
+
+                       if (server != null) {
+                               server.close();
+                       }
+                       // now everything should be cleaned up
+                       checkFileCountForJob(0, jobId, server);
+               }
+       }
+
+       /**
+        * Checks how many of the files given by blob keys are accessible.
+        *
+        * @param jobId
+        *              ID of a job
+        * @param keys
+        *              blob keys to check
+        * @param blobService
+        *              BLOB store to use
+        * @param doThrow
+        *              whether exceptions should be ignored (<tt>false</tt>), 
or thrown (<tt>true</tt>)
+        *
+        * @return number of files we were able to retrieve via {@link 
BlobService#getFile}
+        */
+       public static int checkFilesExist(
+               JobID jobId, Collection<BlobKey> keys, BlobService blobService, 
boolean doThrow)
+               throws IOException {
+
+               int numFiles = 0;
+
+               for (BlobKey key : keys) {
+                       final File blobFile;
+                       if (blobService instanceof BlobServer) {
+                               BlobServer server = (BlobServer) blobService;
+                               blobFile = server.getStorageLocation(jobId, 
key);
+                       } else {
+                               BlobCache cache = (BlobCache) blobService;
+                               blobFile = cache.getStorageLocation(jobId, key);
+                       }
+                       if (blobFile.exists()) {
+                               ++numFiles;
+                       } else if (doThrow) {
+                               throw new IOException("File " + blobFile + " 
does not exist.");
+                       }
+               }
+
+               return numFiles;
+       }
+
+       /**
+        * Checks how many of the files given by blob keys are accessible.
+        *
+        * @param expectedCount
+        *              number of expected files in the blob service for the 
given job
+        * @param jobId
+        *              ID of a job
+        * @param blobService
+        *              BLOB store to use
+        *
+        * @return number of files we were able to retrieve via {@link 
BlobService#getFile}
+        */
+       public static void checkFileCountForJob(
+               int expectedCount, JobID jobId, BlobService blobService)
+               throws IOException {
+
+               final File jobDir;
+               if (blobService instanceof BlobServer) {
+                       BlobServer server = (BlobServer) blobService;
+                       jobDir = server.getStorageLocation(jobId, new 
BlobKey()).getParentFile();
+               } else {
+                       BlobCache cache = (BlobCache) blobService;
+                       jobDir = cache.getStorageLocation(jobId, new 
BlobKey()).getParentFile();
+               }
+               File[] blobsForJob = jobDir.listFiles();
+               if (blobsForJob == null) {
+                       if (expectedCount != 0) {
+                               throw new IOException("File " + jobDir + " does 
not exist.");
+                       }
+               } else {
+                       assertEquals("Too many/few files in job dir: " +
+                                       Arrays.asList(blobsForJob).toString(), 
expectedCount,
+                               blobsForJob.length);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 8c575a9..0060ccb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -37,7 +39,7 @@ import static org.junit.Assert.*;
 /**
  * Unit tests for the blob cache retrying the connection to the server.
  */
-public class BlobCacheRetriesTest {
+public class BlobCacheRetriesTest extends TestLogger {
 
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index d511e86..6d6bfd5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -39,6 +39,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -46,7 +48,7 @@ import static org.junit.Assert.fail;
 /**
  * This class contains unit tests for the {@link BlobClient}.
  */
-public class BlobClientTest {
+public class BlobClientTest extends TestLogger {
 
        /** The buffer size used during the tests in bytes. */
        private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -214,7 +216,7 @@ public class BlobClientTest {
         * Tests the PUT/GET operations for content-addressable buffers.
         */
        @Test
-       public void testContentAddressableBuffer() {
+       public void testContentAddressableBuffer() throws IOException {
 
                BlobClient client = null;
 
@@ -256,10 +258,6 @@ public class BlobClientTest {
                                // expected
                        }
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
                finally {
                        if (client != null) {
                                try {
@@ -281,7 +279,7 @@ public class BlobClientTest {
         * Tests the PUT/GET operations for content-addressable streams.
         */
        @Test
-       public void testContentAddressableStream() {
+       public void testContentAddressableStream() throws IOException {
 
                BlobClient client = null;
                InputStream is = null;
@@ -313,10 +311,6 @@ public class BlobClientTest {
                        validateGetAndClose(client.get(receivedKey), testFile);
                        validateGetAndClose(client.get(jobId, receivedKey), 
testFile);
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
                finally {
                        if (is != null) {
                                try {
@@ -332,7 +326,7 @@ public class BlobClientTest {
        }
 
        /**
-        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, List)} helper.
+        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
         */
        @Test
        public void testUploadJarFilesHelper() throws Exception {
@@ -340,7 +334,7 @@ public class BlobClientTest {
        }
 
        /**
-        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, List)} helper.
+        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)}} helper.
         */
        static void uploadJarFile(BlobServer blobServer, Configuration 
blobClientConfig) throws Exception {
                final File testFile = File.createTempFile("testfile", ".dat");
@@ -354,15 +348,16 @@ public class BlobClientTest {
        }
 
        private static void uploadJarFile(
-               final InetSocketAddress serverAddress, final Configuration 
blobClientConfig,
-               final File testFile) throws IOException {
+                       final InetSocketAddress serverAddress, final 
Configuration blobClientConfig,
+                       final File testFile) throws IOException {
+               JobID jobId = new JobID();
                List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
-                       Collections.singletonList(new Path(testFile.toURI())));
+                       jobId, Collections.singletonList(new 
Path(testFile.toURI())));
 
                assertEquals(1, blobKeys.size());
 
                try (BlobClient blobClient = new BlobClient(serverAddress, 
blobClientConfig)) {
-                       validateGetAndClose(blobClient.get(blobKeys.get(0)), 
testFile);
+                       validateGetAndClose(blobClient.get(jobId, 
blobKeys.get(0)), testFile);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 4071a1c..43bc622 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -29,12 +29,14 @@ import java.io.IOException;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 /**
  * This class contains unit tests for the {@link BlobKey} class.
  */
-public final class BlobKeyTest {
+public final class BlobKeyTest extends TestLogger {
        /**
         * The first key array to be used during the unit tests.
         */
@@ -106,4 +108,4 @@ public final class BlobKeyTest {
 
                assertEquals(k1, k2);
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 413e2e9..6bb5ab5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -41,6 +41,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -62,7 +64,7 @@ public class BlobServerDeleteTest extends TestLogger {
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        @Test
-       public void testDeleteSingleByBlobKey() {
+       public void testDeleteSingleByBlobKey() throws IOException {
                BlobServer server = null;
                BlobClient client = null;
                BlobStore blobStore = new VoidBlobStore();
@@ -131,10 +133,6 @@ public class BlobServerDeleteTest extends TestLogger {
                                // expected
                        }
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
                finally {
                        cleanup(server, client);
                }
@@ -153,16 +151,16 @@ public class BlobServerDeleteTest extends TestLogger {
        }
 
        @Test
-       public void testDeleteAlreadyDeletedNoJob() {
+       public void testDeleteAlreadyDeletedNoJob() throws IOException {
                testDeleteAlreadyDeleted(null);
        }
 
        @Test
-       public void testDeleteAlreadyDeletedForJob() {
+       public void testDeleteAlreadyDeletedForJob() throws IOException {
                testDeleteAlreadyDeleted(new JobID());
        }
 
-       private void testDeleteAlreadyDeleted(final JobID jobId) {
+       private void testDeleteAlreadyDeleted(final JobID jobId) throws 
IOException {
                BlobServer server = null;
                BlobClient client = null;
                BlobStore blobStore = new VoidBlobStore();
@@ -201,10 +199,6 @@ public class BlobServerDeleteTest extends TestLogger {
                                server.delete(jobId, key);
                        }
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
                finally {
                        cleanup(server, client);
                }
@@ -219,16 +213,16 @@ public class BlobServerDeleteTest extends TestLogger {
        }
 
        @Test
-       public void testDeleteFailsNoJob() {
+       public void testDeleteFailsNoJob() throws IOException {
                testDeleteFails(null);
        }
 
        @Test
-       public void testDeleteFailsForJob() {
+       public void testDeleteFailsForJob() throws IOException {
                testDeleteFails(new JobID());
        }
 
-       private void testDeleteFails(final JobID jobId) {
+       private void testDeleteFails(final JobID jobId) throws IOException {
                assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
                BlobServer server = null;
@@ -275,9 +269,6 @@ public class BlobServerDeleteTest extends TestLogger {
                        } else {
                                server.getFile(jobId, key);
                        }
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
                } finally {
                        if (blobFile != null && directory != null) {
                                //noinspection ResultOfMethodCallIgnored
@@ -290,6 +281,64 @@ public class BlobServerDeleteTest extends TestLogger {
        }
 
        /**
+        * Tests that {@link BlobServer} cleans up after calling {@link 
BlobServer#cleanupJob(JobID)}.
+        */
+       @Test
+       public void testJobCleanup() throws IOException, InterruptedException {
+
+               JobID jobId1 = new JobID();
+               List<BlobKey> keys1 = new ArrayList<BlobKey>();
+               JobID jobId2 = new JobID();
+               List<BlobKey> keys2 = new ArrayList<BlobKey>();
+               BlobServer server = null;
+
+               final byte[] buf = new byte[128];
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+
+                       server = new BlobServer(config, new VoidBlobStore());
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       BlobClient bc = new BlobClient(serverAddress, config);
+
+                       keys1.add(bc.put(jobId1, buf));
+                       keys2.add(bc.put(jobId2, buf));
+                       assertEquals(keys2.get(0), keys1.get(0));
+
+                       buf[0] += 1;
+                       keys1.add(bc.put(jobId1, buf));
+
+                       bc.close();
+
+                       assertEquals(2, checkFilesExist(jobId1, keys1, server, 
true));
+                       checkFileCountForJob(2, jobId1, server);
+                       assertEquals(1, checkFilesExist(jobId2, keys2, server, 
true));
+                       checkFileCountForJob(1, jobId2, server);
+
+                       server.cleanupJob(jobId1);
+
+                       checkFileCountForJob(0, jobId1, server);
+                       assertEquals(1, checkFilesExist(jobId2, keys2, server, 
true));
+                       checkFileCountForJob(1, jobId2, server);
+
+                       server.cleanupJob(jobId2);
+
+                       checkFileCountForJob(0, jobId1, server);
+                       checkFileCountForJob(0, jobId2, server);
+
+                       // calling a second time should not fail
+                       server.cleanupJob(jobId2);
+               }
+               finally {
+                       if (server != null) {
+                               server.close();
+                       }
+               }
+       }
+
+       /**
         * FLINK-6020
         *
         * Tests that concurrent delete operations don't interfere with each 
other.

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index e449aab..a6ac447 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
@@ -34,7 +35,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
 
-public class BlobUtilsTest {
+public class BlobUtilsTest extends TestLogger {
 
        private final static String CANNOT_CREATE_THIS = "cannot-create-this";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ec1bbd8..c58e3a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -190,26 +191,4 @@ public class CoordinatorShutdownTest extends TestLogger {
                }
        }
 
-       public static class FailingBlockingInvokable extends AbstractInvokable {
-               private static boolean blocking = true;
-               private static final Object lock = new Object();
-
-               @Override
-               public void invoke() throws Exception {
-                       while (blocking) {
-                               synchronized (lock) {
-                                       lock.wait();
-                               }
-                       }
-                       throw new RuntimeException("This exception is 
expected.");
-               }
-
-               public static void unblock() {
-                       blocking = false;
-
-                       synchronized (lock) {
-                               lock.notifyAll();
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 3814684..4237327 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -137,7 +136,7 @@ public class DispatcherTest extends TestLogger {
                                Configuration configuration,
                                RpcService rpcService,
                                HighAvailabilityServices 
highAvailabilityServices,
-                               BlobService blobService,
+                               BlobServer blobServer,
                                HeartbeatServices heartbeatServices,
                                MetricRegistry metricRegistry,
                                OnCompletionActions onCompleteActions,

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index b43a307..a4b48e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,24 +18,22 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.BlobService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.junit.Assert.*;
-import static org.junit.Assume.assumeTrue;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -45,7 +43,19 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
-public class BlobLibraryCacheManagerTest {
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
+import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Tests for {@link BlobLibraryCacheManager}.
+ */
+public class BlobLibraryCacheManagerTest extends TestLogger {
 
        @Rule
        public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -57,10 +67,13 @@ public class BlobLibraryCacheManagerTest {
        @Test
        public void testLibraryCacheManagerJobCleanup() throws IOException, 
InterruptedException {
 
-               JobID jid = new JobID();
-               List<BlobKey> keys = new ArrayList<BlobKey>();
+               JobID jobId1 = new JobID();
+               JobID jobId2 = new JobID();
+               List<BlobKey> keys1 = new ArrayList<>();
+               List<BlobKey> keys2 = new ArrayList<>();
                BlobServer server = null;
-               BlobLibraryCacheManager libraryCacheManager = null;
+               BlobCache cache = null;
+               BlobLibraryCacheManager libCache = null;
 
                final byte[] buf = new byte[128];
 
@@ -68,122 +81,231 @@ public class BlobLibraryCacheManagerTest {
                        Configuration config = new Configuration();
                        config.setString(BlobServerOptions.STORAGE_DIRECTORY,
                                temporaryFolder.newFolder().getAbsolutePath());
+                       config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
 
                        server = new BlobServer(config, new VoidBlobStore());
-                       InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
-                       BlobClient bc = new BlobClient(blobSocketAddress, 
config);
-
-                       // TODO: make use of job-related BLOBs after adapting 
the BlobLibraryCacheManager
-                       JobID jobId = null;
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       BlobClient bc = new BlobClient(serverAddress, config);
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
 
-                       keys.add(bc.put(jobId, buf));
+                       keys1.add(bc.put(jobId1, buf));
                        buf[0] += 1;
-                       keys.add(bc.put(jobId, buf));
+                       keys1.add(bc.put(jobId1, buf));
+                       keys2.add(bc.put(jobId2, buf));
 
                        bc.close();
 
-                       long cleanupInterval = 1000L;
-                       libraryCacheManager = new 
BlobLibraryCacheManager(server, cleanupInterval);
-                       libraryCacheManager.registerJob(jid, keys, 
Collections.<URL>emptyList());
+                       libCache = new BlobLibraryCacheManager(cache);
+                       cache.registerJob(jobId1);
+                       cache.registerJob(jobId2);
+
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId1));
+                       checkFileCountForJob(2, jobId1, server);
+                       checkFileCountForJob(0, jobId1, cache);
+                       checkFileCountForJob(1, jobId2, server);
+                       checkFileCountForJob(0, jobId2, cache);
+
+                       libCache.registerJob(jobId1, keys1, 
Collections.<URL>emptyList());
+                       ClassLoader classLoader1 = 
libCache.getClassLoader(jobId1);
+
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId1));
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId2));
+                       assertEquals(2, checkFilesExist(jobId1, keys1, cache, 
true));
+                       checkFileCountForJob(2, jobId1, server);
+                       checkFileCountForJob(2, jobId1, cache);
+                       assertEquals(0, checkFilesExist(jobId2, keys2, cache, 
false));
+                       checkFileCountForJob(1, jobId2, server);
+                       checkFileCountForJob(0, jobId2, cache);
+
+                       libCache.registerJob(jobId2, keys2, 
Collections.<URL>emptyList());
+                       ClassLoader classLoader2 = 
libCache.getClassLoader(jobId2);
+                       assertNotEquals(classLoader1, classLoader2);
 
-                       assertEquals(2, checkFilesExist(jobId, keys, server, 
true));
-                       assertEquals(2, 
libraryCacheManager.getNumberOfCachedLibraries());
-                       assertEquals(1, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
-
-                       libraryCacheManager.unregisterJob(jid);
-
-                       // because we cannot guarantee that there are not 
thread races in the build system, we
-                       // loop for a certain while until the references 
disappear
-                       {
-                               long deadline = System.currentTimeMillis() + 
30000;
-                               do {
-                                       Thread.sleep(500);
-                               }
-                               while 
(libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
-                                       System.currentTimeMillis() < deadline);
+                       try {
+                               libCache.registerJob(jobId2, keys1, 
Collections.<URL>emptyList());
+                               fail("Should fail with an 
IllegalStateException");
+                       }
+                       catch (IllegalStateException e) {
+                               // that's what we want
                        }
-
-                       // this fails if we exited via a timeout
-                       assertEquals(0, 
libraryCacheManager.getNumberOfCachedLibraries());
-                       assertEquals(0, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
-
-                       // the blob cache should no longer contain the files
-                       assertEquals(0, checkFilesExist(jobId, keys, server, 
false));
 
                        try {
-                               if (jobId == null) {
-                                       server.getFile(keys.get(0));
-                               } else {
-                                       server.getFile(jobId, keys.get(0));
-                               }
-                               fail("BLOB should have been deleted");
-                       } catch (IOException e) {
-                               // expected
+                               libCache.registerJob(
+                                       jobId2, keys2,
+                                       Collections.singletonList(new 
URL("file:///tmp/does-not-exist")));
+                               fail("Should fail with an 
IllegalStateException");
                        }
-                       try {
-                               if (jobId == null) {
-                                       server.getFile(keys.get(1));
-                               } else {
-                                       server.getFile(jobId, keys.get(1));
-                               }
-                               fail("BLOB should have been deleted");
-                       } catch (IOException e) {
-                               // expected
+                       catch (IllegalStateException e) {
+                               // that's what we want
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+
+                       assertEquals(2, libCache.getNumberOfManagedJobs());
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId1));
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId2));
+                       assertEquals(2, checkFilesExist(jobId1, keys1, cache, 
true));
+                       checkFileCountForJob(2, jobId1, server);
+                       checkFileCountForJob(2, jobId1, cache);
+                       assertEquals(1, checkFilesExist(jobId2, keys2, cache, 
true));
+                       checkFileCountForJob(1, jobId2, server);
+                       checkFileCountForJob(1, jobId2, cache);
+
+                       libCache.unregisterJob(jobId1);
+
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId1));
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId2));
+                       assertEquals(2, checkFilesExist(jobId1, keys1, cache, 
true));
+                       checkFileCountForJob(2, jobId1, server);
+                       checkFileCountForJob(2, jobId1, cache);
+                       assertEquals(1, checkFilesExist(jobId2, keys2, cache, 
true));
+                       checkFileCountForJob(1, jobId2, server);
+                       checkFileCountForJob(1, jobId2, cache);
+
+                       libCache.unregisterJob(jobId2);
+
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId1));
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId2));
+                       assertEquals(2, checkFilesExist(jobId1, keys1, cache, 
true));
+                       checkFileCountForJob(2, jobId1, server);
+                       checkFileCountForJob(2, jobId1, cache);
+                       assertEquals(1, checkFilesExist(jobId2, keys2, cache, 
true));
+                       checkFileCountForJob(1, jobId2, server);
+                       checkFileCountForJob(1, jobId2, cache);
+
+                       // only BlobCache#releaseJob() calls clean up files 
(tested in BlobCacheCleanupTest etc.
                }
                finally {
-                       if (server != null) {
-                               server.close();
+                       if (libCache != null) {
+                               libCache.shutdown();
                        }
 
-                       if (libraryCacheManager != null) {
-                               try {
-                                       libraryCacheManager.shutdown();
-                               }
-                               catch (IOException e) {
-                                       e.printStackTrace();
-                               }
+                       // should have been closed by the libraryCacheManager, 
but just in case
+                       if (cache != null) {
+                               cache.close();
+                       }
+
+                       if (server != null) {
+                               server.close();
                        }
                }
        }
 
        /**
-        * Checks how many of the files given by blob keys are accessible.
-        *
-        * @param keys
-        *              blob keys to check
-        * @param blobService
-        *              BLOB store to use
-        * @param doThrow
-        *              whether exceptions should be ignored (<tt>false</tt>), 
or throws (<tt>true</tt>)
-        *
-        * @return number of files we were able to retrieve via {@link 
BlobService#getFile}
+        * Tests that the {@link BlobLibraryCacheManager} cleans up after 
calling {@link
+        * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}.
         */
-       private static int checkFilesExist(
-                       JobID jobId, List<BlobKey> keys, BlobService 
blobService, boolean doThrow)
-                       throws IOException {
-               int numFiles = 0;
+       @Test
+       public void testLibraryCacheManagerTaskCleanup() throws IOException, 
InterruptedException {
+
+               JobID jobId = new JobID();
+               ExecutionAttemptID attempt1 = new ExecutionAttemptID();
+               ExecutionAttemptID attempt2 = new ExecutionAttemptID();
+               List<BlobKey> keys = new ArrayList<>();
+               BlobServer server = null;
+               BlobCache cache = null;
+               BlobLibraryCacheManager libCache = null;
+
+               final byte[] buf = new byte[128];
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+                       config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+                       server = new BlobServer(config, new VoidBlobStore());
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       BlobClient bc = new BlobClient(serverAddress, config);
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
+
+                       keys.add(bc.put(jobId, buf));
+                       buf[0] += 1;
+                       keys.add(bc.put(jobId, buf));
+
+                       bc.close();
+
+                       libCache = new BlobLibraryCacheManager(cache);
+                       cache.registerJob(jobId);
+
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
+
+                       libCache.registerTask(jobId, attempt1, keys, 
Collections.<URL>emptyList());
+                       ClassLoader classLoader1 = 
libCache.getClassLoader(jobId);
+
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       libCache.registerTask(jobId, attempt2, keys, 
Collections.<URL>emptyList());
+                       ClassLoader classLoader2 = 
libCache.getClassLoader(jobId);
+                       assertEquals(classLoader1, classLoader2);
 
-               for (BlobKey key : keys) {
                        try {
-                               if (jobId == null) {
-                                       blobService.getFile(key);
-                               } else {
-                                       blobService.getFile(jobId, key);
-                               }
-                               ++numFiles;
-                       } catch (IOException e) {
-                               if (doThrow) {
-                                       throw e;
-                               }
+                               libCache.registerTask(
+                                       jobId, new ExecutionAttemptID(), 
Collections.<BlobKey>emptyList(),
+                                       Collections.<URL>emptyList());
+                               fail("Should fail with an 
IllegalStateException");
+                       }
+                       catch (IllegalStateException e) {
+                               // that's what we want
+                       }
+
+                       try {
+                               libCache.registerTask(
+                                       jobId, new ExecutionAttemptID(), keys,
+                                       Collections.singletonList(new 
URL("file:///tmp/does-not-exist")));
+                               fail("Should fail with an 
IllegalStateException");
+                       }
+                       catch (IllegalStateException e) {
+                               // that's what we want
                        }
+
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(2, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       libCache.unregisterTask(jobId, attempt1);
+
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       libCache.unregisterTask(jobId, attempt2);
+
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       // only BlobCache#releaseJob() calls clean up files 
(tested in BlobCacheCleanupTest etc.
                }
+               finally {
+                       if (libCache != null) {
+                               libCache.shutdown();
+                       }
+
+                       // should have been closed by the libraryCacheManager, 
but just in case
+                       if (cache != null) {
+                               cache.close();
+                       }
 
-               return numFiles;
+                       if (server != null) {
+                               server.close();
+                       }
+               }
        }
 
        /**
@@ -191,14 +313,14 @@ public class BlobLibraryCacheManagerTest {
         * BlobLibraryCacheManager#unregisterTask(JobID, ExecutionAttemptID)}.
         */
        @Test
-       public void testLibraryCacheManagerTaskCleanup() throws IOException, 
InterruptedException {
+       public void testLibraryCacheManagerMixedJobTaskCleanup() throws 
IOException, InterruptedException {
 
-               JobID jid = new JobID();
-               ExecutionAttemptID executionId1 = new ExecutionAttemptID();
-               ExecutionAttemptID executionId2 = new ExecutionAttemptID();
-               List<BlobKey> keys = new ArrayList<BlobKey>();
+               JobID jobId = new JobID();
+               ExecutionAttemptID attempt1 = new ExecutionAttemptID();
+               List<BlobKey> keys = new ArrayList<>();
                BlobServer server = null;
-               BlobLibraryCacheManager libraryCacheManager = null;
+               BlobCache cache = null;
+               BlobLibraryCacheManager libCache = null;
 
                final byte[] buf = new byte[128];
 
@@ -206,67 +328,96 @@ public class BlobLibraryCacheManagerTest {
                        Configuration config = new Configuration();
                        config.setString(BlobServerOptions.STORAGE_DIRECTORY,
                                temporaryFolder.newFolder().getAbsolutePath());
+                       config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
 
                        server = new BlobServer(config, new VoidBlobStore());
-                       InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
-                       BlobClient bc = new BlobClient(blobSocketAddress, 
config);
-
-                       // TODO: make use of job-related BLOBs after adapting 
the BlobLibraryCacheManager
-//                     JobID jobId = new JobID();
-                       JobID jobId = null;
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       BlobClient bc = new BlobClient(serverAddress, config);
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
 
                        keys.add(bc.put(jobId, buf));
                        buf[0] += 1;
                        keys.add(bc.put(jobId, buf));
 
-                       long cleanupInterval = 1000L;
-                       libraryCacheManager = new 
BlobLibraryCacheManager(server, cleanupInterval);
-                       libraryCacheManager.registerTask(jid, executionId1, 
keys, Collections.<URL>emptyList());
-                       libraryCacheManager.registerTask(jid, executionId2, 
keys, Collections.<URL>emptyList());
+                       bc.close();
 
-                       assertEquals(2, checkFilesExist(jobId, keys, server, 
true));
-                       assertEquals(2, 
libraryCacheManager.getNumberOfCachedLibraries());
-                       assertEquals(2, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
+                       libCache = new BlobLibraryCacheManager(cache);
+                       cache.registerJob(jobId);
 
-                       libraryCacheManager.unregisterTask(jid, executionId1);
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
 
-                       assertEquals(2, checkFilesExist(jobId, keys, server, 
true));
-                       assertEquals(2, 
libraryCacheManager.getNumberOfCachedLibraries());
-                       assertEquals(1, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
+                       libCache.registerJob(jobId, keys, 
Collections.<URL>emptyList());
+                       ClassLoader classLoader1 = 
libCache.getClassLoader(jobId);
 
-                       libraryCacheManager.unregisterTask(jid, executionId2);
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
 
-                       // because we cannot guarantee that there are not 
thread races in the build system, we
-                       // loop for a certain while until the references 
disappear
-                       {
-                               long deadline = System.currentTimeMillis() + 
30000;
-                               do {
-                                       Thread.sleep(100);
-                               }
-                               while 
(libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
-                                               System.currentTimeMillis() < 
deadline);
+                       libCache.registerTask(jobId, attempt1, keys, 
Collections.<URL>emptyList());
+                       ClassLoader classLoader2 = 
libCache.getClassLoader(jobId);
+                       assertEquals(classLoader1, classLoader2);
+
+                       try {
+                               libCache.registerTask(
+                                       jobId, new ExecutionAttemptID(), 
Collections.<BlobKey>emptyList(),
+                                       Collections.<URL>emptyList());
+                               fail("Should fail with an 
IllegalStateException");
+                       }
+                       catch (IllegalStateException e) {
+                               // that's what we want
                        }
 
-                       // this fails if we exited via a timeout
-                       assertEquals(0, 
libraryCacheManager.getNumberOfCachedLibraries());
-                       assertEquals(0, 
libraryCacheManager.getNumberOfReferenceHolders(jid));
+                       try {
+                               libCache.registerTask(
+                                       jobId, new ExecutionAttemptID(), keys,
+                                       Collections.singletonList(new 
URL("file:///tmp/does-not-exist")));
+                               fail("Should fail with an 
IllegalStateException");
+                       }
+                       catch (IllegalStateException e) {
+                               // that's what we want
+                       }
 
-                       // the blob cache should no longer contain the files
-                       assertEquals(0, checkFilesExist(jobId, keys, server, 
false));
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(2, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
 
-                       bc.close();
-               } finally {
-                       if (server != null) {
-                               server.close();
+                       libCache.unregisterJob(jobId);
+
+                       assertEquals(1, libCache.getNumberOfManagedJobs());
+                       assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       libCache.unregisterTask(jobId, attempt1);
+
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
+                       assertEquals(2, checkFilesExist(jobId, keys, cache, 
true));
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(2, jobId, cache);
+
+                       // only BlobCache#releaseJob() calls clean up files 
(tested in BlobCacheCleanupTest etc.
+               }
+               finally {
+                       if (libCache != null) {
+                               libCache.shutdown();
                        }
 
-                       if (libraryCacheManager != null) {
-                               try {
-                                       libraryCacheManager.shutdown();
-                               }
-                               catch (IOException e) {
-                                       e.printStackTrace();
-                               }
+                       // should have been closed by the libraryCacheManager, 
but just in case
+                       if (cache != null) {
+                               cache.close();
+                       }
+
+                       if (server != null) {
+                               server.close();
                        }
                }
        }
@@ -275,75 +426,103 @@ public class BlobLibraryCacheManagerTest {
        public void testRegisterAndDownload() throws IOException {
                assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
+               JobID jobId = new JobID();
                BlobServer server = null;
                BlobCache cache = null;
+               BlobLibraryCacheManager libCache = null;
                File cacheDir = null;
                try {
                        // create the blob transfer services
                        Configuration config = new Configuration();
                        config.setString(BlobServerOptions.STORAGE_DIRECTORY,
                                temporaryFolder.newFolder().getAbsolutePath());
+                       config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 
1_000_000L);
 
                        server = new BlobServer(config, new VoidBlobStore());
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
 
-                       // TODO: make use of job-related BLOBs after adapting 
the BlobLibraryCacheManager
-                       JobID jobId = null;
-
                        // upload some meaningless data to the server
                        BlobClient uploader = new BlobClient(serverAddress, 
config);
                        BlobKey dataKey1 = uploader.put(jobId, new byte[]{1, 2, 
3, 4, 5, 6, 7, 8});
                        BlobKey dataKey2 = uploader.put(jobId, new byte[]{11, 
12, 13, 14, 15, 16, 17, 18});
                        uploader.close();
 
-                       BlobLibraryCacheManager libCache = new 
BlobLibraryCacheManager(cache, 1000000000L);
-
-                       assertEquals(0, libCache.getNumberOfCachedLibraries());
+                       libCache = new BlobLibraryCacheManager(cache);
+                       assertEquals(0, libCache.getNumberOfManagedJobs());
+                       checkFileCountForJob(2, jobId, server);
+                       checkFileCountForJob(0, jobId, cache);
 
                        // first try to access a non-existing entry
+                       assertEquals(0, 
libCache.getNumberOfReferenceHolders(new JobID()));
                        try {
                                libCache.getClassLoader(new JobID());
                                fail("Should fail with an 
IllegalStateException");
                        }
                        catch (IllegalStateException e) {
-                               // that#s what we want
+                               // that's what we want
                        }
 
-                       // now register some BLOBs as libraries
+                       // register some BLOBs as libraries
                        {
-                               JobID jid = new JobID();
-                               ExecutionAttemptID executionId = new 
ExecutionAttemptID();
                                Collection<BlobKey> keys = 
Collections.singleton(dataKey1);
 
-                               libCache.registerTask(jid, executionId, keys, 
Collections.<URL>emptyList());
-                               assertEquals(1, 
libCache.getNumberOfReferenceHolders(jid));
-                               assertEquals(1, 
libCache.getNumberOfCachedLibraries());
-                               assertNotNull(libCache.getClassLoader(jid));
-
-                               // un-register them again
-                               libCache.unregisterTask(jid, executionId);
+                               cache.registerJob(jobId);
+                               ExecutionAttemptID executionId = new 
ExecutionAttemptID();
+                               libCache.registerTask(jobId, executionId, keys, 
Collections.<URL>emptyList());
+                               ClassLoader classLoader1 = 
libCache.getClassLoader(jobId);
+                               assertEquals(1, 
libCache.getNumberOfManagedJobs());
+                               assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId));
+                               assertEquals(1, checkFilesExist(jobId, keys, 
cache, true));
+                               checkFileCountForJob(2, jobId, server);
+                               checkFileCountForJob(1, jobId, cache);
+                               assertNotNull(libCache.getClassLoader(jobId));
+
+                               libCache.registerJob(jobId, keys, 
Collections.<URL>emptyList());
+                               ClassLoader classLoader2 = 
libCache.getClassLoader(jobId);
+                               assertEquals(classLoader1, classLoader2);
+                               assertEquals(1, 
libCache.getNumberOfManagedJobs());
+                               assertEquals(2, 
libCache.getNumberOfReferenceHolders(jobId));
+                               assertEquals(1, checkFilesExist(jobId, keys, 
cache, true));
+                               checkFileCountForJob(2, jobId, server);
+                               checkFileCountForJob(1, jobId, cache);
+                               assertNotNull(libCache.getClassLoader(jobId));
+
+                               // un-register the job
+                               libCache.unregisterJob(jobId);
+                               // still one task
+                               assertEquals(1, 
libCache.getNumberOfManagedJobs());
+                               assertEquals(1, 
libCache.getNumberOfReferenceHolders(jobId));
+                               assertEquals(1, checkFilesExist(jobId, keys, 
cache, true));
+                               checkFileCountForJob(2, jobId, server);
+                               checkFileCountForJob(1, jobId, cache);
+
+                               // unregister the task registration
+                               libCache.unregisterTask(jobId, executionId);
+                               assertEquals(0, 
libCache.getNumberOfManagedJobs());
+                               assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
+                               // changing the libCache registration does not 
influence the BLOB stores...
+                               checkFileCountForJob(2, jobId, server);
+                               checkFileCountForJob(1, jobId, cache);
 
                                // Don't fail if called again
-                               libCache.unregisterTask(jid, executionId);
+                               libCache.unregisterJob(jobId);
+                               assertEquals(0, 
libCache.getNumberOfManagedJobs());
+                               assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
 
-                               assertEquals(0, 
libCache.getNumberOfReferenceHolders(jid));
+                               libCache.unregisterTask(jobId, executionId);
+                               assertEquals(0, 
libCache.getNumberOfManagedJobs());
+                               assertEquals(0, 
libCache.getNumberOfReferenceHolders(jobId));
 
-                               // library is still cached (but not associated 
with job any more)
-                               assertEquals(1, 
libCache.getNumberOfCachedLibraries());
+                               cache.releaseJob(jobId);
 
-                               // should not be able to access the classloader 
any more
-                               try {
-                                       libCache.getClassLoader(jid);
-                                       fail("Should fail with an 
IllegalStateException");
-                               }
-                               catch (IllegalStateException e) {
-                                       // that's what we want
-                               }
+                               // library is still cached (but not associated 
with job any more)
+                               checkFileCountForJob(2, jobId, server);
+                               checkFileCountForJob(1, jobId, cache);
                        }
 
                        // see BlobUtils for the directory layout
-                       cacheDir = new File(cache.getStorageDir(), "no_job");
+                       cacheDir = cache.getStorageLocation(jobId, new 
BlobKey()).getParentFile();
                        assertTrue(cacheDir.exists());
 
                        // make sure no further blobs can be downloaded by 
removing the write
@@ -352,12 +531,14 @@ public class BlobLibraryCacheManagerTest {
 
                        // since we cannot download this library any more, this 
call should fail
                        try {
-                               libCache.registerTask(new JobID(), new 
ExecutionAttemptID(), Collections.singleton(dataKey2),
-                                               Collections.<URL>emptyList());
+                               cache.registerJob(jobId);
+                               libCache.registerTask(jobId, new 
ExecutionAttemptID(), Collections.singleton(dataKey2),
+                                       Collections.<URL>emptyList());
                                fail("This should fail with an IOException");
                        }
                        catch (IOException e) {
                                // splendid!
+                               cache.releaseJob(jobId);
                        }
                } finally {
                        if (cacheDir != null) {
@@ -368,6 +549,9 @@ public class BlobLibraryCacheManagerTest {
                        if (cache != null) {
                                cache.close();
                        }
+                       if (libCache != null) {
+                               libCache.shutdown();
+                       }
                        if (server != null) {
                                server.close();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 2f6738d..e52310e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -49,6 +50,9 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+/**
+ * Integration test for {@link BlobLibraryCacheManager}.
+ */
 public class BlobLibraryCacheRecoveryITCase extends TestLogger {
 
        @Rule
@@ -65,7 +69,6 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                InetSocketAddress[] serverAddress = new InetSocketAddress[2];
                BlobLibraryCacheManager[] libServer = new 
BlobLibraryCacheManager[2];
                BlobCache cache = null;
-               BlobLibraryCacheManager libCache = null;
                BlobStoreService blobStoreService = null;
 
                Configuration config = new Configuration();
@@ -75,6 +78,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        temporaryFolder.newFolder().getAbsolutePath());
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
                        temporaryFolder.newFolder().getAbsolutePath());
+               config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600L);
 
                try {
                        blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
@@ -82,7 +86,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        for (int i = 0; i < server.length; i++) {
                                server[i] = new BlobServer(config, 
blobStoreService);
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
-                               libServer[i] = new 
BlobLibraryCacheManager(server[i], 3600 * 1000);
+                               libServer[i] = new 
BlobLibraryCacheManager(server[i]);
                        }
 
                        // Random data
@@ -92,25 +96,22 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        List<BlobKey> keys = new ArrayList<>(2);
 
                        JobID jobId = new JobID();
-                       // TODO: replace+adapt by jobId after adapting the 
BlobLibraryCacheManager
-                       JobID blobJobId = null;
 
                        // Upload some data (libraries)
                        try (BlobClient client = new 
BlobClient(serverAddress[0], config)) {
-                               keys.add(client.put(blobJobId, expected)); // 
Request 1
-                               keys.add(client.put(blobJobId, expected, 32, 
256)); // Request 2
+                               keys.add(client.put(jobId, expected)); // 
Request 1
+                               keys.add(client.put(jobId, expected, 32, 256)); 
// Request 2
                        }
 
                        // The cache
                        cache = new BlobCache(serverAddress[0], config, 
blobStoreService);
-                       libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
 
                        // Register uploaded libraries
                        ExecutionAttemptID executionId = new 
ExecutionAttemptID();
                        libServer[0].registerTask(jobId, executionId, keys, 
Collections.<URL>emptyList());
 
                        // Verify key 1
-                       File f = cache.getFile(keys.get(0));
+                       File f = cache.getFile(jobId, keys.get(0));
                        assertEquals(expected.length, f.length());
 
                        try (FileInputStream fis = new FileInputStream(f)) {
@@ -123,13 +124,11 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
 
                        // Shutdown cache and start with other server
                        cache.close();
-                       libCache.shutdown();
 
                        cache = new BlobCache(serverAddress[1], config, 
blobStoreService);
-                       libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
 
                        // Verify key 1
-                       f = cache.getFile(keys.get(0));
+                       f = cache.getFile(jobId, keys.get(0));
                        assertEquals(expected.length, f.length());
 
                        try (FileInputStream fis = new FileInputStream(f)) {
@@ -141,7 +140,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        }
 
                        // Verify key 2
-                       f = cache.getFile(keys.get(1));
+                       f = cache.getFile(jobId, keys.get(1));
                        assertEquals(256, f.length());
 
                        try (FileInputStream fis = new FileInputStream(f)) {
@@ -154,8 +153,8 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
 
                        // Remove blobs again
                        try (BlobClient client = new 
BlobClient(serverAddress[1], config)) {
-                               client.delete(keys.get(0));
-                               client.delete(keys.get(1));
+                               client.delete(jobId, keys.get(0));
+                               client.delete(jobId, keys.get(1));
                        }
 
                        // Verify everything is clean below 
recoveryDir/<cluster_id>
@@ -167,6 +166,11 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
                }
                finally {
+                       for (BlobLibraryCacheManager s : libServer) {
+                               if (s != null) {
+                                       s.shutdown();
+                               }
+                       }
                        for (BlobServer s : server) {
                                if (s != null) {
                                        s.close();
@@ -177,10 +181,6 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                                cache.close();
                        }
 
-                       if (libCache != null) {
-                               libCache.shutdown();
-                       }
-
                        if (blobStoreService != null) {
                                blobStoreService.closeAndCleanupAllData();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b236240/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
new file mode 100644
index 0000000..b2b455b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerCleanupITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.FailingBlockingInvokable;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Small test to check that the {@link 
org.apache.flink.runtime.blob.BlobServer} cleanup is executed
+ * after job termination.
+ */
+public class JobManagerCleanupITCase extends TestLogger {
+
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+       private static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(system);
+       }
+
+       /**
+        * Specifies which test case to run in {@link 
#testBlobServerCleanup(TestCase)}.
+        */
+       private enum TestCase {
+               JOB_FINISHES_SUCESSFULLY,
+               JOB_IS_CANCELLED,
+               JOB_FAILS,
+               JOB_SUBMISSION_FAILS
+       }
+
+       /**
+        * Test cleanup for a job that finishes ordinarily.
+        */
+       @Test
+       public void testBlobServerCleanupFinishedJob() throws IOException {
+               testBlobServerCleanup(TestCase.JOB_FINISHES_SUCESSFULLY);
+       }
+
+       /**
+        * Test cleanup for a job which is cancelled after submission.
+        */
+       @Test
+       public void testBlobServerCleanupCancelledJob() throws IOException {
+               testBlobServerCleanup(TestCase.JOB_IS_CANCELLED);
+       }
+
+       /**
+        * Test cleanup for a job that fails (first a task fails, then the job 
recovers, then the whole
+        * job fails due to a limited restart policy).
+        */
+       @Test
+       public void testBlobServerCleanupFailedJob() throws IOException {
+               testBlobServerCleanup(TestCase.JOB_FAILS);
+       }
+
+       /**
+        * Test cleanup for a job that fails job submission (emulated by an 
additional BLOB not being
+        * present).
+        */
+       @Test
+       public void testBlobServerCleanupFailedSubmission() throws IOException {
+               testBlobServerCleanup(TestCase.JOB_SUBMISSION_FAILS);
+       }
+
+       private void testBlobServerCleanup(final TestCase testCase) throws 
IOException {
+               final int num_tasks = 2;
+               final File blobBaseDir = tmpFolder.newFolder();
+
+               new JavaTestKit(system) {{
+                       new Within(duration("30 seconds")) {
+                               @Override
+                               protected void run() {
+                                       // Setup
+
+                                       TestingCluster cluster = null;
+                                       BlobClient bc = null;
+
+                                       try {
+                                               Configuration config = new 
Configuration();
+                                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+                                               
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+                                               
config.setString(AkkaOptions.ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT());
+                                               
config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
blobBaseDir.getAbsolutePath());
+
+                                               
config.setString(ConfigConstants.RESTART_STRATEGY, "fixeddelay");
+                                               
config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+                                               
config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "1 s");
+                                               // BLOBs are deleted from 
BlobCache between 1s and 2s after last reference
+                                               // -> the BlobCache may still 
have the BLOB or not (let's test both cases randomly)
+                                               
config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L);
+
+                                               cluster = new 
TestingCluster(config);
+                                               cluster.start();
+
+                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
+                                                       
TestingUtils.TESTING_DURATION());
+
+                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(),
+                                                       
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                                               // Create a task
+
+                                               JobVertex source = new 
JobVertex("Source");
+                                               if (testCase == 
TestCase.JOB_FAILS || testCase == TestCase.JOB_IS_CANCELLED) {
+                                                       
source.setInvokableClass(FailingBlockingInvokable.class);
+                                               } else {
+                                                       
source.setInvokableClass(NoOpInvokable.class);
+                                               }
+                                               
source.setParallelism(num_tasks);
+
+                                               JobGraph jobGraph = new 
JobGraph("BlobCleanupTest", source);
+                                               final JobID jid = 
jobGraph.getJobID();
+
+                                               // request the blob port from 
the job manager
+                                               Future<Object> future = 
jobManagerGateway
+                                                       
.ask(JobManagerMessages.getRequestBlobManagerPort(), remaining());
+                                               int blobPort = (Integer) 
Await.result(future, remaining());
+
+                                               // upload a blob
+                                               BlobKey key1;
+                                               bc = new BlobClient(new 
InetSocketAddress("localhost", blobPort),
+                                                       config);
+                                               try {
+                                                       key1 = bc.put(jid, new 
byte[10]);
+                                               } finally {
+                                                       bc.close();
+                                               }
+                                               jobGraph.addBlob(key1);
+
+                                               if (testCase == 
TestCase.JOB_SUBMISSION_FAILS) {
+                                                       // add an invalid key 
so that the submission fails
+                                                       jobGraph.addBlob(new 
BlobKey());
+                                               }
+
+                                               // Submit the job and wait for 
all vertices to be running
+                                               jobManagerGateway.tell(
+                                                       new 
JobManagerMessages.SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                                       testActorGateway);
+                                               if (testCase == 
TestCase.JOB_SUBMISSION_FAILS) {
+                                                       
expectMsgClass(JobManagerMessages.JobResultFailure.class);
+                                               } else {
+                                                       
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+                                                       if (testCase == 
TestCase.JOB_FAILS) {
+                                                               // fail a task 
so that the job is going to be recovered (we actually do not
+                                                               // need the 
blocking part of the invokable and can start throwing right away)
+                                                               
FailingBlockingInvokable.unblock();
+
+                                                               // job will get 
restarted, BlobCache may re-download the BLOB if already deleted
+                                                               // then the 
tasks will fail again and the restart strategy will finalise the job
+
+                                                               
expectMsgClass(JobManagerMessages.JobResultFailure.class);
+                                                       } else if (testCase == 
TestCase.JOB_IS_CANCELLED) {
+                                                               
jobManagerGateway.tell(
+                                                                       new 
JobManagerMessages.CancelJob(jid),
+                                                                       
testActorGateway);
+                                                               
expectMsgClass(JobManagerMessages.CancellationResponse.class);
+
+                                                               // job will be 
cancelled and everything should be cleaned up
+
+                                                               
expectMsgClass(JobManagerMessages.JobResultFailure.class);
+                                                       } else {
+                                                               
expectMsgClass(JobManagerMessages.JobResultSuccess.class);
+                                                       }
+                                               }
+
+                                               // both BlobServer and 
BlobCache should eventually delete all files
+
+                                               File[] blobDirs = 
blobBaseDir.listFiles(new FilenameFilter() {
+                                                       @Override
+                                                       public boolean 
accept(File dir, String name) {
+                                                               return 
name.startsWith("blobStore-");
+                                                       }
+                                               });
+                                               assertNotNull(blobDirs);
+                                               for (File blobDir : blobDirs) {
+                                                       
waitForEmptyBlobDir(blobDir, remaining());
+                                               }
+
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                               fail(e.getMessage());
+                                       } finally {
+                                               if (bc != null) {
+                                                       try {
+                                                               bc.close();
+                                                       } catch (IOException 
ignored) {
+                                                       }
+                                               }
+                                               if (cluster != null) {
+                                                       cluster.shutdown();
+                                               }
+                                       }
+                               }
+                       };
+               }};
+
+               // after everything has been shut down, the storage directory 
itself should be empty
+               assertArrayEquals(new File[] {}, blobBaseDir.listFiles());
+       }
+
+       /**
+        * Waits until the given {@link 
org.apache.flink.runtime.blob.BlobService} storage directory
+        * does not contain any job-related folders any more.
+        *
+        * @param blobDir
+        *              directory of a {@link 
org.apache.flink.runtime.blob.BlobServer} or {@link
+        *              org.apache.flink.runtime.blob.BlobCache}
+        * @param remaining
+        *              remaining time for this test
+        *
+        * @see org.apache.flink.runtime.blob.BlobUtils
+        */
+       private static void waitForEmptyBlobDir(File blobDir, FiniteDuration 
remaining)
+               throws InterruptedException {
+               long deadline = System.currentTimeMillis() + 
remaining.toMillis();
+               String[] blobDirContents;
+               do {
+                       blobDirContents = blobDir.list(new FilenameFilter() {
+                               @Override
+                               public boolean accept(File dir, String name) {
+                                       return name.startsWith("job_");
+                               }
+                       });
+                       if (blobDirContents == null || blobDirContents.length 
== 0) {
+                               return;
+                       }
+                       Thread.sleep(100);
+               } while (System.currentTimeMillis() < deadline);
+
+               fail("Timeout while waiting for " + blobDir.getAbsolutePath() + 
" to become empty. Current contents: " + Arrays.toString(blobDirContents));
+       }
+}

Reply via email to