[FLINK-6519] Integrate BlobStore in lifecycle management of 
HighAvailabilityServices

The HighAvailabilityService creates a single BlobStoreService instance which is
shared by all BlobServer and BlobCache instances. The BlobStoreService's 
lifecycle
is exclusively managed by the HighAvailabilityServices. This means that the
BlobStore's content is only cleaned up if the HighAvailabilityService's HA data
is cleaned up. Having this single point of control, makes it easier to decide 
when
to discard HA data (e.g. in case of a successful job execution) and when to 
retain
the data (e.g. for recovery).

Close and cleanup all data of BlobStore in HighAvailabilityServices

Use HighAvailabilityServices to create BlobStore

Introduce BlobStoreService interface to hide close and closeAndCleanupAllData 
methods

This closes #3864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88b0f2ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88b0f2ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88b0f2ac

Branch: refs/heads/master
Commit: 88b0f2ac3fd788932d7f434ca57ba3718c3fa621
Parents: ebc1368
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue May 9 10:26:37 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed May 17 08:18:49 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    | 14 ++-
 .../clusterframework/MesosTaskManager.scala     |  6 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 26 +++++-
 .../handlers/TaskManagerLogHandler.java         | 11 ++-
 .../webmonitor/WebRuntimeMonitorITCase.java     | 14 ++-
 .../handlers/TaskManagerLogHandlerTest.java     | 11 ++-
 .../apache/flink/runtime/blob/BlobCache.java    | 80 ++++-------------
 .../apache/flink/runtime/blob/BlobServer.java   | 38 ++++----
 .../apache/flink/runtime/blob/BlobService.java  |  8 +-
 .../apache/flink/runtime/blob/BlobStore.java    | 26 +-----
 .../flink/runtime/blob/BlobStoreService.java    | 32 +++++++
 .../apache/flink/runtime/blob/BlobUtils.java    | 44 +++++++--
 .../org/apache/flink/runtime/blob/BlobView.java | 49 +++++++++++
 .../flink/runtime/blob/FileSystemBlobStore.java | 11 ++-
 .../flink/runtime/blob/VoidBlobStore.java       |  8 +-
 .../apache/flink/runtime/client/JobClient.java  | 13 ++-
 .../runtime/client/JobListeningContext.java     |  6 +-
 .../clusterframework/BootstrapTools.java        |  8 +-
 .../librarycache/BlobLibraryCacheManager.java   |  2 +-
 .../HighAvailabilityServicesUtils.java          | 12 ++-
 .../nonha/AbstractNonHaServices.java            |  5 +-
 .../zookeeper/ZooKeeperHaServices.java          | 93 ++++++++++----------
 .../runtime/jobmaster/JobManagerServices.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  5 +-
 .../runtime/webmonitor/WebMonitorUtils.java     | 21 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 21 ++---
 .../runtime/minicluster/FlinkMiniCluster.scala  |  8 +-
 .../minicluster/LocalFlinkMiniCluster.scala     |  8 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 36 +++++---
 .../runtime/blob/BlobCacheRetriesTest.java      | 79 ++++++++---------
 .../runtime/blob/BlobCacheSuccessTest.java      | 56 ++++++------
 .../flink/runtime/blob/BlobClientSslTest.java   | 52 +++++------
 .../flink/runtime/blob/BlobClientTest.java      | 16 ++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 21 +++--
 .../runtime/blob/BlobServerDeleteTest.java      | 21 +++--
 .../flink/runtime/blob/BlobServerGetTest.java   | 41 +++------
 .../flink/runtime/blob/BlobServerPutTest.java   | 89 +++++--------------
 .../flink/runtime/blob/BlobServerRangeTest.java | 10 +--
 .../runtime/blob/TestingFailingBlobServer.java  |  4 +-
 .../BlobLibraryCacheManagerTest.java            | 33 +++----
 .../BlobLibraryCacheRecoveryITCase.java         | 21 +++--
 .../zookeeper/ZooKeeperRegistryTest.java        |  7 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  9 +-
 .../JobManagerLeaderElectionTest.java           |  3 +-
 .../ZooKeeperLeaderRetrievalTest.java           | 13 ++-
 .../runtime/metrics/TaskManagerMetricsTest.java |  2 +-
 ...askManagerComponentsStartupShutdownTest.java |  5 +-
 .../TaskManagerRegistrationTest.java            |  5 +-
 .../src/test/resources/log4j-test.properties    |  2 +-
 .../jobmanager/JobManagerRegistrationTest.scala |  3 +-
 .../testingUtils/TestingTaskManager.scala       | 40 ++++-----
 ...agerHAProcessFailureBatchRecoveryITCase.java |  1 +
 .../flink/yarn/TestingYarnTaskManager.scala     | 25 +++---
 .../YarnHighAvailabilityServices.java           | 30 ++++++-
 .../org/apache/flink/yarn/YarnTaskManager.scala |  6 +-
 55 files changed, 667 insertions(+), 545 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index 8a3f662..0815863 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -31,6 +31,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;
 import org.apache.flink.runtime.blob.BlobRecoveryITCase;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
@@ -234,7 +236,17 @@ public class HDFSTest {
                config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
 
-               BlobRecoveryITCase.testBlobServerRecovery(config);
+               BlobStoreService blobStoreService = null;
+
+               try {
+                       blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
+                       BlobRecoveryITCase.testBlobServerRecovery(config, 
blobStoreService);
+               } finally {
+                       if (blobStoreService != null) {
+                               blobStoreService.closeAndCleanupAllData();
+                       }
+               }
        }
 
        // package visible

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index e8d6a58..7834639 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.mesos.runtime.clusterframework
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -38,7 +38,7 @@ class MesosTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
@@ -48,7 +48,7 @@ class MesosTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry) {
 
   override def handleMessage: Receive = {

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 03b53ad..10d7c6c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -148,6 +149,7 @@ public class WebRuntimeMonitor implements WebMonitor {
        public WebRuntimeMonitor(
                        Configuration config,
                        LeaderRetrievalService leaderRetrievalService,
+                       BlobView blobView,
                        ActorSystem actorSystem) throws IOException, 
InterruptedException {
 
                this.leaderRetrievalService = 
checkNotNull(leaderRetrievalService);
@@ -269,10 +271,26 @@ public class WebRuntimeMonitor implements WebMonitor {
                GET(router, new JobMetricsHandler(metricFetcher));
 
                GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, 
metricFetcher));
-               GET(router, new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
-                               TaskManagerLogHandler.FileMode.LOG, config, 
enableSSL));
-               GET(router, new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout,
-                               TaskManagerLogHandler.FileMode.STDOUT, config, 
enableSSL));
+               GET(router,
+                       new TaskManagerLogHandler(
+                               retriever,
+                               context,
+                               jobManagerAddressPromise.future(),
+                               timeout,
+                               TaskManagerLogHandler.FileMode.LOG,
+                               config,
+                               enableSSL,
+                               blobView));
+               GET(router,
+                       new TaskManagerLogHandler(
+                               retriever,
+                               context,
+                               jobManagerAddressPromise.future(),
+                               timeout,
+                               TaskManagerLogHandler.FileMode.STDOUT,
+                               config,
+                               enableSSL,
+                               blobView));
                GET(router, new TaskManagerMetricsHandler(metricFetcher));
 
                router

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 37ee814..53ee336 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -50,6 +50,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,6 +118,8 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
 
        private final Time timeTimeout;
 
+       private final BlobView blobView;
+
        public enum FileMode {
                LOG,
                STDOUT
@@ -128,7 +132,8 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                FiniteDuration timeout,
                FileMode fileMode,
                Configuration config,
-               boolean httpsEnabled) {
+               boolean httpsEnabled,
+               BlobView blobView) {
                super(retriever, localJobManagerAddressPromise, timeout, 
httpsEnabled);
 
                this.executor = checkNotNull(executor);
@@ -142,6 +147,8 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                                break;
                }
 
+               this.blobView = Preconditions.checkNotNull(blobView, 
"blobView");
+
                timeTimeout = Time.milliseconds(timeout.toMillis());
        }
 
@@ -167,7 +174,7 @@ public class TaskManagerLogHandler extends 
RuntimeMonitorHandlerBase {
                                        Option<String> hostOption = 
jobManager.actor().path().address().host();
                                        String host = hostOption.isDefined() ? 
hostOption.get() : "localhost";
                                        int port = (int) result;
-                                       return new BlobCache(new 
InetSocketAddress(host, port), config);
+                                       return new BlobCache(new 
InetSocketAddress(host, port), config, blobView);
                                }
                        }, executor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index a51a234..cd5a2b7 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -154,6 +155,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                webMonitor[i] = new WebRuntimeMonitor(
                                        config,
                                        
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                                       
highAvailabilityServices.createBlobStore(),
                                        jobManagerSystem[i]);
                        }
 
@@ -294,9 +296,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 
                        actorSystem = AkkaUtils.createDefaultActorSystem();
 
-                       LeaderRetrievalService leaderRetrievalService = 
mock(LeaderRetrievalService.class);
                        webRuntimeMonitor = new WebRuntimeMonitor(
-                                       config, leaderRetrievalService, 
actorSystem);
+                               config,
+                               mock(LeaderRetrievalService.class),
+                               mock(BlobView.class),
+                               actorSystem);
 
                        webRuntimeMonitor.start("akka://schmakka");
 
@@ -467,10 +471,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                config.setInteger(JobManagerOptions.WEB_PORT, 0);
                config.setString(JobManagerOptions.WEB_LOG_PATH, 
logFile.toString());
 
+               HighAvailabilityServices highAvailabilityServices = 
flink.highAvailabilityServices();
+
                WebRuntimeMonitor webMonitor = new WebRuntimeMonitor(
                        config,
-                       
flink.highAvailabilityServices().getJobManagerLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID),
+                       
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                       highAvailabilityServices.createBlobStore(),
                        jmActorSystem);
 
                webMonitor.start(jobManagerAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
index 4177f44..3d8f1a3 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -53,7 +54,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Matchers.any;
@@ -71,7 +71,8 @@ public class TaskManagerLogHandlerTest {
                        AkkaUtils.getDefaultClientTimeout(),
                        TaskManagerLogHandler.FileMode.LOG,
                        new Configuration(),
-                       false);
+                       false,
+                       new VoidBlobStore());
                String[] pathsLog = handlerLog.getPaths();
                Assert.assertEquals(1, pathsLog.length);
                Assert.assertEquals("/taskmanagers/:taskmanagerid/log", 
pathsLog[0]);
@@ -83,7 +84,8 @@ public class TaskManagerLogHandlerTest {
                        AkkaUtils.getDefaultClientTimeout(),
                        TaskManagerLogHandler.FileMode.STDOUT,
                        new Configuration(),
-                       false);
+                       false,
+                       new VoidBlobStore());
                String[] pathsOut = handlerOut.getPaths();
                Assert.assertEquals(1, pathsOut.length);
                Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", 
pathsOut[0]);
@@ -131,7 +133,8 @@ public class TaskManagerLogHandlerTest {
                        AkkaUtils.getDefaultClientTimeout(),
                        TaskManagerLogHandler.FileMode.LOG,
                        new Configuration(),
-                       false);
+                       false,
+                       new VoidBlobStore());
 
                final AtomicReference<String> exception = new 
AtomicReference<>();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 23c7e63..aa47eae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.IOUtils;
 import org.slf4j.Logger;
@@ -58,7 +57,7 @@ public final class BlobCache implements BlobService {
        private final File storageDir;
 
        /** Blob store for distributed file storage, e.g. in HA */
-       private final BlobStore blobStore;
+       private final BlobView blobView;
 
        private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
@@ -78,55 +77,19 @@ public final class BlobCache implements BlobService {
         *              address of the {@link BlobServer} to use for fetching 
files from
         * @param blobClientConfig
         *              global configuration
-        *
-        * @throws IOException
-        *              thrown if the (local or distributed) file storage 
cannot be created or
-        *              is not usable
-        */
-       public BlobCache(InetSocketAddress serverAddress,
-                       Configuration blobClientConfig) throws IOException {
-               this(serverAddress, blobClientConfig,
-                       BlobUtils.createBlobStoreFromConfig(blobClientConfig));
-       }
-
-       /**
-        * Instantiates a new BLOB cache.
-        *
-        * @param serverAddress
-        *              address of the {@link BlobServer} to use for fetching 
files from
-        * @param blobClientConfig
-        *              global configuration
-        *      @param haServices
-        *              high availability services able to create a distributed 
blob store
-        *
-        * @throws IOException
-        *              thrown if the (local or distributed) file storage 
cannot be created or
-        *              is not usable
-        */
-       public BlobCache(InetSocketAddress serverAddress,
-               Configuration blobClientConfig, HighAvailabilityServices 
haServices) throws IOException {
-               this(serverAddress, blobClientConfig, 
haServices.createBlobStore());
-       }
-
-       /**
-        * Instantiates a new BLOB cache.
-        *
-        * @param serverAddress
-        *              address of the {@link BlobServer} to use for fetching 
files from
-        * @param blobClientConfig
-        *              global configuration
-        * @param blobStore
+        * @param blobView
         *              (distributed) blob store file system to retrieve files 
from first
         *
         * @throws IOException
         *              thrown if the (local or distributed) file storage 
cannot be created or is not usable
         */
-       private BlobCache(
-                       final InetSocketAddress serverAddress, final 
Configuration blobClientConfig,
-                       final BlobStore blobStore) throws IOException {
+       public BlobCache(
+                       final InetSocketAddress serverAddress,
+                       final Configuration blobClientConfig,
+                       final BlobView blobView) throws IOException {
                this.serverAddress = checkNotNull(serverAddress);
                this.blobClientConfig = checkNotNull(blobClientConfig);
-               this.blobStore = blobStore;
+               this.blobView = checkNotNull(blobView, "blobStore");
 
                // configure and create the storage directory
                String storageDirectory = 
blobClientConfig.getString(BlobServerOptions.STORAGE_DIRECTORY);
@@ -168,7 +131,7 @@ public final class BlobCache implements BlobService {
 
                // first try the distributed blob store (if available)
                try {
-                       blobStore.get(requiredBlob, localJarFile);
+                       blobView.get(requiredBlob, localJarFile);
                } catch (Exception e) {
                        LOG.info("Failed to copy from blob store. Downloading 
from BLOB server instead.", e);
                }
@@ -293,28 +256,23 @@ public final class BlobCache implements BlobService {
        }
 
        @Override
-       public void shutdown() {
+       public void close() throws IOException {
                if (shutdownRequested.compareAndSet(false, true)) {
                        LOG.info("Shutting down BlobCache");
 
                        // Clean up the storage directory
                        try {
                                FileUtils.deleteDirectory(storageDir);
-                       }
-                       catch (IOException e) {
-                               LOG.error("BLOB cache failed to properly clean 
up its storage directory.");
-                       }
-
-                       // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the shutdown hook itself
-                       if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                               try {
-                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                               }
-                               catch (IllegalStateException e) {
-                                       // race, JVM is in shutdown already, we 
can safely ignore this
-                               }
-                               catch (Throwable t) {
-                                       LOG.warn("Exception while unregistering 
BLOB cache's cleanup shutdown hook.");
+                       } finally {
+                               // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the shutdown hook itself
+                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
+                                       try {
+                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                                       } catch (IllegalStateException e) {
+                                               // race, JVM is in shutdown 
already, we can safely ignore this
+                                       } catch (Throwable t) {
+                                               LOG.warn("Exception while 
unregistering BLOB cache's cleanup shutdown hook.");
+                                       }
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 0e15777..937eab0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -94,19 +94,14 @@ public class BlobServer extends Thread implements 
BlobService {
        /**
         * Instantiates a new BLOB server and binds it to a free network port.
         *
+        * @param config Configuration to be used to instantiate the BlobServer
+        * @param blobStore BlobStore to store blobs persistently
+        *
         * @throws IOException
         *              thrown if the BLOB server cannot bind to a free network 
port or if the
         *              (local or distributed) file storage cannot be created 
or is not usable
         */
-       public BlobServer(Configuration config) throws IOException {
-               this(config, BlobUtils.createBlobStoreFromConfig(config));
-       }
-
-       public BlobServer(Configuration config, HighAvailabilityServices 
haServices) throws IOException {
-               this(config, haServices.createBlobStore());
-       }
-
-       private BlobServer(Configuration config, BlobStore blobStore) throws 
IOException {
+       public BlobServer(Configuration config, BlobStore blobStore) throws 
IOException {
                this.blobServiceConfiguration = checkNotNull(config);
                this.blobStore = checkNotNull(blobStore);
 
@@ -269,7 +264,12 @@ public class BlobServer extends Thread implements 
BlobService {
                catch (Throwable t) {
                        if (!this.shutdownRequested.get()) {
                                LOG.error("BLOB server stopped working. 
Shutting down", t);
-                               shutdown();
+
+                               try {
+                                       close();
+                               } catch (Throwable closeThrowable) {
+                                       LOG.error("Could not properly close the 
BlobServer.", closeThrowable);
+                               }
                        }
                }
        }
@@ -278,13 +278,15 @@ public class BlobServer extends Thread implements 
BlobService {
         * Shuts down the BLOB server.
         */
        @Override
-       public void shutdown() {
+       public void close() throws IOException {
                if (shutdownRequested.compareAndSet(false, true)) {
+                       Exception exception = null;
+
                        try {
                                this.serverSocket.close();
                        }
                        catch (IOException ioe) {
-                               LOG.debug("Error while closing the server 
socket.", ioe);
+                               exception = ioe;
                        }
 
                        // wake the thread up, in case it is waiting on some 
operation
@@ -294,13 +296,15 @@ public class BlobServer extends Thread implements 
BlobService {
                                join();
                        }
                        catch (InterruptedException ie) {
+                               Thread.currentThread().interrupt();
+
                                LOG.debug("Error while waiting for this thread 
to die.", ie);
                        }
 
                        synchronized (activeConnections) {
                                if (!activeConnections.isEmpty()) {
                                        for (BlobServerConnection conn : 
activeConnections) {
-                                               LOG.debug("Shutting down 
connection " + conn.getName());
+                                               LOG.debug("Shutting down 
connection {}.", conn.getName());
                                                conn.close();
                                        }
                                        activeConnections.clear();
@@ -312,7 +316,7 @@ public class BlobServer extends Thread implements 
BlobService {
                                FileUtils.deleteDirectory(storageDir);
                        }
                        catch (IOException e) {
-                               LOG.error("BLOB server failed to properly clean 
up its storage directory.");
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
                        // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the
@@ -325,13 +329,15 @@ public class BlobServer extends Thread implements 
BlobService {
                                        // race, JVM is in shutdown already, we 
can safely ignore this
                                }
                                catch (Throwable t) {
-                                       LOG.warn("Exception while unregistering 
BLOB server's cleanup shutdown hook.");
+                                       LOG.warn("Exception while unregistering 
BLOB server's cleanup shutdown hook.", t);
                                }
                        }
 
                        if(LOG.isInfoEnabled()) {
                                LOG.info("Stopped BLOB server at {}:{}", 
serverSocket.getInetAddress().getHostAddress(), getPort());
                        }
+
+                       ExceptionUtils.tryRethrowIOException(exception);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 419ee8d..97a2d51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.blob;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.URL;
 
 /**
  * A simple store and retrieve binary large objects (BLOBs).
  */
-public interface BlobService {
+public interface BlobService extends Closeable {
 
        /**
         * This method returns the URL of the file associated with the provided 
blob key.
@@ -49,11 +50,6 @@ public interface BlobService {
         * @return the port of the blob service.
         */
        int getPort();
-
-       /**
-        * Shutdown method which is called to terminate the blob service.
-        */
-       void shutdown();
        
        BlobClient createClient() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 64dc942..4c26a5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 /**
  * A blob store.
  */
-public interface BlobStore {
+public interface BlobStore extends BlobView {
 
        /**
         * Copies the local file to the blob store.
@@ -50,25 +50,6 @@ public interface BlobStore {
        void put(File localFile, JobID jobId, String key) throws IOException;
 
        /**
-        * Copies a blob to a local file.
-        *
-        * @param blobKey   The blob ID
-        * @param localFile The local file to copy to
-        * @throws IOException If the copy fails
-        */
-       void get(BlobKey blobKey, File localFile) throws IOException;
-
-       /**
-        * Copies a blob to a local file.
-        *
-        * @param jobId     The JobID part of ID for the blob
-        * @param key       The String part of ID for the blob
-        * @param localFile The local file to copy to
-        * @throws IOException If the copy fails
-        */
-       void get(JobID jobId, String key, File localFile) throws IOException;
-
-       /**
         * Tries to delete a blob from storage.
         *
         * <p>NOTE: This also tries to delete any created directories if 
empty.</p>
@@ -95,9 +76,4 @@ public interface BlobStore {
         * @param jobId The JobID part of all blobs to delete
         */
        void deleteAll(JobID jobId);
-
-       /**
-        * Cleans up the store and deletes all blobs.
-        */
-       void cleanUp();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
new file mode 100644
index 0000000..83cd9d4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStoreService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import java.io.Closeable;
+
+/**
+ * Service interface for the BlobStore which allows to close and clean up its 
data.
+ */
+public interface BlobStoreService extends BlobStore, Closeable {
+
+       /**
+        * Closes and cleans up the store. This entails the deletion of all 
blobs.
+        */
+       void closeAndCleanupAllData();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 3c14f2f..8da362d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -22,8 +22,10 @@ import com.google.common.io.BaseEncoding;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.StringUtils;
@@ -41,6 +43,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * Utility class to work with blob data.
@@ -78,18 +81,49 @@ public class BlobUtils {
         * @throws IOException
         *              thrown if the (distributed) file storage cannot be 
created
         */
-       static BlobStore createBlobStoreFromConfig(Configuration config) throws 
IOException {
+       public static BlobStoreService createBlobStoreFromConfig(Configuration 
config) throws IOException {
                HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
 
                if (highAvailabilityMode == HighAvailabilityMode.NONE) {
                        return new VoidBlobStore();
                } else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
-                       return ZooKeeperHaServices.createBlobStore(config);
+                       return createFileSystemBlobStore(config);
                } else {
                        throw new IllegalConfigurationException("Unexpected 
high availability mode '" + highAvailabilityMode + "'.");
                }
        }
 
+       private static BlobStoreService createFileSystemBlobStore(Configuration 
configuration) throws IOException {
+               String storagePath = configuration.getValue(
+                       HighAvailabilityOptions.HA_STORAGE_PATH);
+               if (isNullOrWhitespaceOnly(storagePath)) {
+                       throw new IllegalConfigurationException("Configuration 
is missing the mandatory parameter: " +
+                               HighAvailabilityOptions.HA_STORAGE_PATH);
+               }
+
+               final Path path;
+               try {
+                       path = new Path(storagePath);
+               } catch (Exception e) {
+                       throw new IOException("Invalid path for highly 
available storage (" +
+                               HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
')', e);
+               }
+
+               final FileSystem fileSystem;
+               try {
+                       fileSystem = path.getFileSystem();
+               } catch (Exception e) {
+                       throw new IOException("Could not create FileSystem for 
highly available storage (" +
+                               HighAvailabilityOptions.HA_STORAGE_PATH.key() + 
')', e);
+               }
+
+               final String clusterId =
+                       
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+               storagePath += "/" + clusterId;
+
+               return new FileSystemBlobStore(fileSystem, storagePath);
+       }
+
        /**
         * Creates a storage directory for a blob service.
         *
@@ -246,10 +280,10 @@ public class BlobUtils {
                        @Override
                        public void run() {
                                try {
-                                       service.shutdown();
+                                       service.close();
                                }
                                catch (Throwable t) {
-                                       logger.error("Error during shutdown of 
blob service via JVM shutdown hook: " + t.getMessage(), t);
+                                       logger.error("Error during shutdown of 
blob service via JVM shutdown hook.", t);
                                }
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
new file mode 100644
index 0000000..11cf011
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobView.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * View on blobs stored in a {@link BlobStore}.
+ */
+public interface BlobView {
+
+       /**
+        * Copies a blob to a local file.
+        *
+        * @param blobKey   The blob ID
+        * @param localFile The local file to copy to
+        * @throws IOException If the copy fails
+        */
+       void get(BlobKey blobKey, File localFile) throws IOException;
+
+       /**
+        * Copies a blob to a local file.
+        *
+        * @param jobId     The JobID part of ID for the blob
+        * @param key       The String part of ID for the blob
+        * @param localFile The local file to copy to
+        * @throws IOException If the copy fails
+        */
+       void get(JobID jobId, String key, File localFile) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 7cfce7a..b54756c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -41,7 +41,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>This is used in addition to the local blob storage for high availability.
  */
-public class FileSystemBlobStore implements BlobStore {
+public class FileSystemBlobStore implements BlobStoreService {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBlobStore.class);
 
@@ -157,14 +157,19 @@ public class FileSystemBlobStore implements BlobStore {
        }
 
        @Override
-       public void cleanUp() {
+       public void closeAndCleanupAllData() {
                try {
                        LOG.debug("Cleaning up {}.", basePath);
 
                        fileSystem.delete(new Path(basePath), true);
                }
                catch (Exception e) {
-                       LOG.error("Failed to clean up recovery directory.");
+                       LOG.error("Failed to clean up recovery directory.", e);
                }
        }
+
+       @Override
+       public void close() throws IOException {
+               // nothing to do for the FileSystemBlobStore
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 8606844..c14d082 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 /**
  * A blob store doing nothing.
  */
-public class VoidBlobStore implements BlobStore {
+public class VoidBlobStore implements BlobStoreService {
 
        @Override
        public void put(File localFile, BlobKey blobKey) throws IOException {
@@ -57,6 +57,8 @@ public class VoidBlobStore implements BlobStore {
        }
 
        @Override
-       public void cleanUp() {
-       }
+       public void closeAndCleanupAllData() {}
+
+       @Override
+       public void close() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index b570383..86d927a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -191,7 +191,8 @@ public class JobClient {
        public static ClassLoader retrieveClassLoader(
                JobID jobID,
                ActorGateway jobManager,
-               Configuration config)
+               Configuration config,
+               HighAvailabilityServices highAvailabilityServices)
                throws JobRetrievalException {
 
                final Object jmAnswer;
@@ -213,7 +214,8 @@ public class JobClient {
                        InetSocketAddress serverAddress = new 
InetSocketAddress(jmHostname, props.blobManagerPort());
                        final BlobCache blobClient;
                        try {
-                               blobClient = new BlobCache(serverAddress, 
config);
+                               // TODO: Fix lifecycle of BlobCache to properly 
close it upon usage
+                               blobClient = new BlobCache(serverAddress, 
config, highAvailabilityServices.createBlobStore());
                        } catch (IOException e) {
                                throw new JobRetrievalException(jobID,
                                        "Failed to setup blob cache", e);
@@ -229,7 +231,12 @@ public class JobClient {
                                try {
                                        allURLs[pos++] = 
blobClient.getURL(blobKey);
                                } catch (Exception e) {
-                                       blobClient.shutdown();
+                                       try {
+                                               blobClient.close();
+                                       } catch (IOException ioe) {
+                                               LOG.warn("Could not properly 
close the BlobClient.", ioe);
+                                       }
+
                                        throw new JobRetrievalException(jobID, 
"Failed to download BlobKey " + blobKey, e);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
index fe8c34c..bb448be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobListeningContext.java
@@ -134,7 +134,11 @@ public final class JobListeningContext {
        public ClassLoader getClassLoader() throws JobRetrievalException {
                if (classLoader == null) {
                        // lazily initializes the class loader when it is needed
-                       classLoader = JobClient.retrieveClassLoader(jobID, 
getJobManager(), configuration);
+                       classLoader = JobClient.retrieveClassLoader(
+                               jobID,
+                               getJobManager(),
+                               configuration,
+                               highAvailabilityServices);
                        LOG.info("Reconstructed class loader for Job {}", 
jobID);
                }
                return classLoader;

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index ea508d1..5bdfe1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.NetUtils;
@@ -191,13 +190,12 @@ public class BootstrapTools {
                if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
0) >= 0) {
                        logger.info("Starting JobManager Web Frontend");
 
-                       LeaderRetrievalService leaderRetrievalService =
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-
                        // start the web frontend. we need to load this 
dynamically
                        // because it is not in the same project/dependencies
                        WebMonitor monitor = 
WebMonitorUtils.startWebRuntimeMonitor(
-                               config, leaderRetrievalService, actorSystem);
+                               config,
+                               highAvailabilityServices,
+                               actorSystem);
 
                        // start the web monitor
                        if (monitor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index b0d5d83..0702a11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -208,7 +208,7 @@ public final class BlobLibraryCacheManager extends 
TimerTask implements LibraryC
                        LOG.warn("Failed to run clean up task before shutdown", 
t);
                }
 
-               blobService.shutdown();
+               blobService.close();
                cleanupTimer.cancel();
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index c9e2957..2ebfd20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.highavailability;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
@@ -49,10 +51,13 @@ public class HighAvailabilityServicesUtils {
                                return new EmbeddedHaServices(executor);
 
                        case ZOOKEEPER:
+                               BlobStoreService blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
                                return new ZooKeeperHaServices(
                                        
ZooKeeperUtils.startCuratorFramework(config),
                                        executor,
-                                       config);
+                                       config,
+                                       blobStoreService);
 
                        default:
                                throw new Exception("High availability mode " + 
highAvailabilityMode + " is not supported.");
@@ -85,10 +90,13 @@ public class HighAvailabilityServicesUtils {
 
                                return new 
StandaloneHaServices(resourceManagerRpcUrl, jobManagerRpcUrl);
                        case ZOOKEEPER:
+                               BlobStoreService blobStoreService = 
BlobUtils.createBlobStoreFromConfig(configuration);
+
                                return new ZooKeeperHaServices(
                                        
ZooKeeperUtils.startCuratorFramework(configuration),
                                        executor,
-                                       configuration);
+                                       configuration,
+                                       blobStoreService);
                        default:
                                throw new Exception("Recovery mode " + 
highAvailabilityMode + " is not supported.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
index ac90e3f..9c3d986 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java
@@ -44,10 +44,13 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
 
        private final RunningJobsRegistry runningJobsRegistry;
 
+       private final VoidBlobStore voidBlobStore;
+
        private boolean shutdown;
 
        public AbstractNonHaServices() {
                this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
+               this.voidBlobStore = new VoidBlobStore();
 
                shutdown = false;
        }
@@ -88,7 +91,7 @@ public abstract class AbstractNonHaServices implements 
HighAvailabilityServices
                synchronized (lock) {
                        checkNotShutdown();
 
-                       return new VoidBlobStore();
+                       return voidBlobStore;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index 5d895c1..d4748cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -23,11 +23,8 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
-import org.apache.flink.runtime.blob.FileSystemBlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -36,12 +33,12 @@ import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * An implementation of the {@link HighAvailabilityServices} using Apache 
ZooKeeper.
@@ -102,11 +99,20 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
        /** The zookeeper based running jobs registry */
        private final RunningJobsRegistry runningJobsRegistry;
 
-       public ZooKeeperHaServices(CuratorFramework client, Executor executor, 
Configuration configuration) {
+       /** Store for arbitrary blobs */
+       private final BlobStoreService blobStoreService;
+
+       public ZooKeeperHaServices(
+                       CuratorFramework client,
+                       Executor executor,
+                       Configuration configuration,
+                       BlobStoreService blobStoreService) {
                this.client = checkNotNull(client);
                this.executor = checkNotNull(executor);
                this.configuration = checkNotNull(configuration);
                this.runningJobsRegistry = new 
ZooKeeperRunningJobsRegistry(client, configuration);
+
+               this.blobStoreService = checkNotNull(blobStoreService);
        }
 
        // 
------------------------------------------------------------------------
@@ -150,61 +156,52 @@ public class ZooKeeperHaServices implements 
HighAvailabilityServices {
 
        @Override
        public BlobStore createBlobStore() throws IOException {
-               return createBlobStore(configuration);
+               return blobStoreService;
        }
 
-       /**
-        * Creates the BLOB store in which BLOBs are stored in a 
highly-available
-        * fashion.
-        *
-        * @param configuration configuration to extract the storage path from
-        * @return Blob store
-        * @throws IOException if the blob store could not be created
-        */
-       public static BlobStore createBlobStore(
-               final Configuration configuration) throws IOException {
-               String storagePath = configuration.getValue(
-                       HighAvailabilityOptions.HA_STORAGE_PATH);
-               if (isNullOrWhitespaceOnly(storagePath)) {
-                       throw new IllegalConfigurationException("Configuration 
is missing the mandatory parameter: " +
-                                       
HighAvailabilityOptions.HA_STORAGE_PATH);
-               }
+       // 
------------------------------------------------------------------------
+       //  Shutdown
+       // 
------------------------------------------------------------------------
 
-               final Path path;
-               try {
-                       path = new Path(storagePath);
-               } catch (Exception e) {
-                       throw new IOException("Invalid path for highly 
available storage (" +
-                                       
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
-               }
+       @Override
+       public void close() throws Exception {
+               Throwable exception = null;
 
-               final FileSystem fileSystem;
                try {
-                       fileSystem = path.getFileSystem();
-               } catch (Exception e) {
-                       throw new IOException("Could not create FileSystem for 
highly available storage (" +
-                                       
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+                       blobStoreService.close();
+               } catch (Throwable t) {
+                       exception = t;
                }
 
-               final String clusterId =
-                       
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
-               storagePath += "/" + clusterId;
+               internalClose();
 
-               return new FileSystemBlobStore(fileSystem, storagePath);
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Could not 
properly close the ZooKeeperHaServices.");
+               }
        }
 
-       // 
------------------------------------------------------------------------
-       //  Shutdown
-       // 
------------------------------------------------------------------------
-
        @Override
-       public void close() throws Exception {
-               client.close();
+       public void closeAndCleanupAllData() throws Exception {
+               Throwable exception = null;
+
+               try {
+                       blobStoreService.closeAndCleanupAllData();
+               } catch (Throwable t) {
+                       exception = t;
+               }
+
+               internalClose();
+
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Could not 
properly close and clean up all data of ZooKeeperHaServices.");
+               }
        }
 
-       @Override
-       public void closeAndCleanupAllData() throws Exception {
-               close();
+       /**
+        * Closes components which don't distinguish between close and 
closeAndCleanupAllData
+        */
+       private void internalClose() {
+               client.close();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index 8cda0f7..ac4d06f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -105,7 +105,7 @@ public class JobManagerServices {
                        Configuration config,
                        HighAvailabilityServices haServices) throws Exception {
 
-               final BlobServer blobServer = new BlobServer(config, 
haServices);
+               final BlobServer blobServer = new BlobServer(config, 
haServices.createBlobStore());
 
                final long cleanupInterval = config.getLong(
                        ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d05d900..a919065 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -923,7 +923,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                final LibraryCacheManager libraryCacheManager;
                try {
-                       final BlobCache blobCache = new 
BlobCache(blobServerAddress, taskManagerConfiguration.getConfiguration(), 
haServices);
+                       final BlobCache blobCache = new BlobCache(
+                               blobServerAddress,
+                               taskManagerConfiguration.getConfiguration(),
+                               haServices.createBlobStore());
                        libraryCacheManager = new BlobLibraryCacheManager(
                                blobCache,
                                taskManagerConfiguration.getCleanupInterval());

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index dd9527e..3853b21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -28,10 +28,12 @@ import java.net.URI;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobView;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -117,12 +119,14 @@ public final class WebMonitorUtils {
         * Because failure to start the web runtime monitor is not considered 
fatal, this method does
         * not throw any exceptions, but only logs them.
         *
-        * @param config                 The configuration for the runtime 
monitor.
-        * @param leaderRetrievalService Leader retrieval service to get the 
leading JobManager
+        * @param config The configuration for the runtime monitor.
+        * @param highAvailabilityServices HighAvailabilityServices used to 
start the WebRuntimeMonitor
+        * @param actorSystem ActorSystem used to connect to the JobManager
+        *
         */
        public static WebMonitor startWebRuntimeMonitor(
                        Configuration config,
-                       LeaderRetrievalService leaderRetrievalService,
+                       HighAvailabilityServices highAvailabilityServices,
                        ActorSystem actorSystem) {
                // try to load and instantiate the class
                try {
@@ -130,9 +134,14 @@ public final class WebMonitorUtils {
                        Class<? extends WebMonitor> clazz = 
Class.forName(classname).asSubclass(WebMonitor.class);
                        
                        Constructor<? extends WebMonitor> constructor = 
clazz.getConstructor(Configuration.class,
-                                       LeaderRetrievalService.class,
-                                       ActorSystem.class);
-                       return constructor.newInstance(config, 
leaderRetrievalService, actorSystem);
+                               LeaderRetrievalService.class,
+                               BlobView.class,
+                               ActorSystem.class);
+                       return constructor.newInstance(
+                               config,
+                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                               highAvailabilityServices.createBlobStore(),
+                               actorSystem);
                } catch (ClassNotFoundException e) {
                        LOG.error("Could not load web runtime monitor. " +
                                        "Probably reason: flink-runtime-web is 
not in the classpath");

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 57a6415..6095094 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -36,7 +36,7 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
-import org.apache.flink.runtime.blob.BlobServer
+import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStore}
 import org.apache.flink.runtime.client._
@@ -46,7 +46,7 @@ import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceMa
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, 
BiFunction, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
@@ -2274,14 +2274,12 @@ object JobManager {
     val webMonitor: Option[WebMonitor] =
       if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
0) >= 0) {
         LOG.info("Starting JobManager web frontend")
-        val leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(
-          HighAvailabilityServices.DEFAULT_JOB_ID)
 
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
         val webServer = WebMonitorUtils.startWebRuntimeMonitor(
           configuration,
-          leaderRetrievalService,
+          highAvailabilityServices,
           jobManagerSystem)
 
         Option(webServer)
@@ -2507,12 +2505,14 @@ object JobManager {
    * @param configuration The configuration from which to parse the config 
values.
    * @param futureExecutor to run JobManager's futures
    * @param ioExecutor to run blocking io operations
+   * @param blobStore to store blobs persistently
    * @return The members for a default JobManager.
    */
   def createJobManagerComponents(
       configuration: Configuration,
       futureExecutor: ScheduledExecutorService,
-      ioExecutor: Executor) :
+      ioExecutor: Executor,
+      blobStore: BlobStore) :
     (InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
@@ -2557,7 +2557,7 @@ object JobManager {
     var libraryCacheManager: BlobLibraryCacheManager = null
 
     try {
-      blobServer = new BlobServer(configuration)
+      blobServer = new BlobServer(configuration, blobStore)
       instanceManager = new InstanceManager()
       scheduler = new 
FlinkScheduler(ExecutionContext.fromExecutor(futureExecutor))
       libraryCacheManager = new BlobLibraryCacheManager(blobServer, 
cleanupInterval)
@@ -2576,7 +2576,7 @@ object JobManager {
           instanceManager.shutdown()
         }
         if (blobServer != null) {
-          blobServer.shutdown()
+          blobServer.close()
         }
         
         throw t
@@ -2688,7 +2688,8 @@ object JobManager {
     metricsRegistry) = createJobManagerComponents(
       configuration,
       futureExecutor,
-      ioExecutor)
+      ioExecutor,
+      highAvailabilityServices.createBlobStore())
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
@@ -2744,7 +2745,7 @@ object JobManager {
     ioExecutor: Executor,
     instanceManager: InstanceManager,
     scheduler: FlinkScheduler,
-    libraryCacheManager: BlobLibraryCacheManager,
+    libraryCacheManager: LibraryCacheManager,
     archive: ActorRef,
     restartStrategyFactory: RestartStrategyFactory,
     timeout: FiniteDuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 46c4404..2ace8db 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -37,7 +37,7 @@ import 
org.apache.flink.runtime.highavailability.{HighAvailabilityServices, High
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
 import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
-import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService, StandaloneLeaderRetrievalService}
+import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -387,17 +387,13 @@ abstract class FlinkMiniCluster(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
 
-      // TODO: Add support for HA: Make web server work independently from the 
JM
-      val leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(
-        HighAvailabilityServices.DEFAULT_JOB_ID)
-
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
       // because it is not in the same project/dependencies
       val webServer = Option(
         WebMonitorUtils.startWebRuntimeMonitor(
           config,
-          leaderRetrievalService,
+          highAvailabilityServices,
           actorSystem)
       )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 8677307..a535388 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -143,7 +143,8 @@ class LocalFlinkMiniCluster(
     metricsRegistry) = JobManager.createJobManagerComponents(
       config,
       futureExecutor,
-      ioExecutor)
+      ioExecutor,
+      highAvailabilityServices.createBlobStore())
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
       metricsRegistry.get.startQueryService(system, null)
@@ -249,8 +250,6 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      highAvailabilityServices.getJobManagerLeaderRetriever(
-        HighAvailabilityServices.DEFAULT_JOB_ID),
       metricRegistry)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
@@ -315,7 +314,6 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService,
     metricsRegistry: MetricRegistry): Props = {
 
     TaskManager.getTaskManagerProps(
@@ -326,7 +324,7 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricsRegistry)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a3110a4..7684a6b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -125,7 +125,7 @@ class TaskManager(
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
-    protected val leaderRetrievalService: LeaderRetrievalService,
+    protected val highAvailabilityServices: HighAvailabilityServices,
     protected val metricsRegistry: FlinkMetricRegistry)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to 
filter after logging
@@ -149,6 +149,10 @@ class TaskManager(
   /** Handler for distributed files cached by this TaskManager */
   protected val fileCache = new FileCache(config.getTmpDirectories())
 
+  protected val leaderRetrievalService: LeaderRetrievalService = 
highAvailabilityServices.
+    getJobManagerLeaderRetriever(
+      HighAvailabilityServices.DEFAULT_JOB_ID)
+
   private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
   /** Actors which want to be notified once this task manager has been
@@ -959,7 +963,10 @@ class TaskManager(
       log.info(s"Determined BLOB server address to be $address. Starting BLOB 
cache.")
 
       try {
-        val blobcache = new BlobCache(address, config.getConfiguration())
+        val blobcache = new BlobCache(
+          address,
+          config.getConfiguration(),
+          highAvailabilityServices.createBlobStore())
         blobService = Option(blobcache)
         libraryCacheManager = Some(
           new BlobLibraryCacheManager(blobcache, config.getCleanupInterval()))
@@ -1039,12 +1046,24 @@ class TaskManager(
 
     // shut down BLOB and library cache
     libraryCacheManager foreach {
-      manager => manager.shutdown()
+      manager =>
+        try {
+          manager.shutdown()
+        } catch {
+          case ioe: IOException => log.error(
+            "Could not properly shutdown library cache manager.",
+            ioe)
+        }
     }
     libraryCacheManager = None
 
     blobService foreach {
-      service => service.shutdown()
+      service =>
+        try {
+          service.close()
+        } catch {
+          case ioe: IOException => log.error("Could not properly shutdown blob 
service.", ioe)
+        }
     }
     blobService = None
 
@@ -1905,9 +1924,6 @@ object TaskManager {
 
     val metricRegistry = taskManagerServices.getMetricRegistry()
 
-    val leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(
-      HighAvailabilityServices.DEFAULT_JOB_ID)
-
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = getTaskManagerProps(
       taskManagerClass,
@@ -1917,7 +1933,7 @@ object TaskManager {
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricRegistry)
 
     metricRegistry.startQueryService(actorSystem, resourceID)
@@ -1936,7 +1952,7 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricsRegistry: FlinkMetricRegistry
   ): Props = {
     Props(
@@ -1948,7 +1964,7 @@ object TaskManager {
       ioManager,
       networkEnvironment,
       taskManagerConfig.getNumberSlots(),
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricsRegistry)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 34a8a39..1cf77ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -43,10 +43,10 @@ public class BlobCacheRetriesTest {
         * A test where the connection fails twice and then the get operation 
succeeds.
         */
        @Test
-       public void testBlobFetchRetries() {
+       public void testBlobFetchRetries() throws IOException {
                final Configuration config = new Configuration();
 
-               testBlobFetchRetries(config);
+               testBlobFetchRetries(config, new VoidBlobStore());
        }
 
        /**
@@ -54,13 +54,23 @@ public class BlobCacheRetriesTest {
         * (with high availability set).
         */
        @Test
-       public void testBlobFetchRetriesHa() {
+       public void testBlobFetchRetriesHa() throws IOException {
                final Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
                        temporaryFolder.getRoot().getPath());
 
-               testBlobFetchRetries(config);
+               BlobStoreService blobStoreService = null;
+
+               try {
+                       blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
+                       testBlobFetchRetries(config, blobStoreService);
+               } finally {
+                       if (blobStoreService != null) {
+                               blobStoreService.closeAndCleanupAllData();
+                       }
+               }
        }
 
        /**
@@ -71,14 +81,14 @@ public class BlobCacheRetriesTest {
         *              configuration to use (the BlobCache will get some 
additional settings
         *              set compared to this one)
         */
-       private void testBlobFetchRetries(final Configuration config) {
+       private void testBlobFetchRetries(final Configuration config, final 
BlobStore blobStore) throws IOException {
                final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
 
                BlobServer server = null;
                BlobCache cache = null;
                try {
 
-                       server = new TestingFailingBlobServer(config, 2);
+                       server = new TestingFailingBlobServer(config, 
blobStore, 2);
 
                        final InetSocketAddress
                                serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -97,13 +107,7 @@ public class BlobCacheRetriesTest {
                                }
                        }
 
-                       // create a separate config for the cache with no 
access to
-                       // the (shared) storage path if available so that the 
cache
-                       // will always bother the BlobServer!
-                       final Configuration cacheConfig = new 
Configuration(config);
-                       
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                               temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
-                       cache = new BlobCache(serverAddress, cacheConfig);
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
 
                        // trigger a download - it should fail the first two 
times, but retry, and succeed eventually
                        URL url = cache.getURL(key);
@@ -116,17 +120,12 @@ public class BlobCacheRetriesTest {
                        finally {
                                is.close();
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (cache != null) {
-                               cache.shutdown();
+                               cache.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }
@@ -135,10 +134,10 @@ public class BlobCacheRetriesTest {
         * A test where the connection fails too often and eventually fails the 
GET request.
         */
        @Test
-       public void testBlobFetchWithTooManyFailures() {
+       public void testBlobFetchWithTooManyFailures() throws IOException {
                final Configuration config = new Configuration();
 
-               testBlobFetchWithTooManyFailures(config);
+               testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
        }
 
        /**
@@ -146,13 +145,23 @@ public class BlobCacheRetriesTest {
         * (with high availability set).
         */
        @Test
-       public void testBlobFetchWithTooManyFailuresHa() {
+       public void testBlobFetchWithTooManyFailuresHa() throws IOException {
                final Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
                        temporaryFolder.getRoot().getPath());
 
-               testBlobFetchWithTooManyFailures(config);
+               BlobStoreService blobStoreService = null;
+
+               try {
+                       blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
+                       testBlobFetchWithTooManyFailures(config, 
blobStoreService);
+               } finally {
+                       if (blobStoreService != null) {
+                               blobStoreService.closeAndCleanupAllData();
+                       }
+               }
        }
 
        /**
@@ -163,14 +172,14 @@ public class BlobCacheRetriesTest {
         *              configuration to use (the BlobCache will get some 
additional settings
         *              set compared to this one)
         */
-       private void testBlobFetchWithTooManyFailures(final Configuration 
config) {
+       private void testBlobFetchWithTooManyFailures(final Configuration 
config, final BlobStore blobStore) throws IOException {
                final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
 
                BlobServer server = null;
                BlobCache cache = null;
                try {
 
-                       server = new TestingFailingBlobServer(config, 10);
+                       server = new TestingFailingBlobServer(config, 
blobStore, 10);
 
                        final InetSocketAddress
                                serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -189,13 +198,7 @@ public class BlobCacheRetriesTest {
                                }
                        }
 
-                       // create a separate config for the cache with no 
access to
-                       // the (shared) storage path if available so that the 
cache
-                       // will always bother the BlobServer!
-                       final Configuration cacheConfig = new 
Configuration(config);
-                       
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                               temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
-                       cache = new BlobCache(serverAddress, cacheConfig);
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
 
                        // trigger a download - it should fail eventually
                        try {
@@ -206,16 +209,12 @@ public class BlobCacheRetriesTest {
                                // as we expected
                        }
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
                finally {
                        if (cache != null) {
-                               cache.shutdown();
+                               cache.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index db55331..2a65a3b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -49,7 +50,7 @@ public class BlobCacheSuccessTest {
         * BlobServer.
         */
        @Test
-       public void testBlobCache() {
+       public void testBlobCache() throws IOException {
                Configuration config = new Configuration();
                uploadFileGetTest(config, false, false);
        }
@@ -60,7 +61,7 @@ public class BlobCacheSuccessTest {
         * BlobServer.
         */
        @Test
-       public void testBlobCacheHa() {
+       public void testBlobCacheHa() throws IOException {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -73,7 +74,7 @@ public class BlobCacheSuccessTest {
         * file system and thus needs to download BLOBs from the BlobServer.
         */
        @Test
-       public void testBlobCacheHaFallback() {
+       public void testBlobCacheHaFallback() throws IOException {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
@@ -82,17 +83,30 @@ public class BlobCacheSuccessTest {
        }
 
        private void uploadFileGetTest(final Configuration config, boolean 
cacheWorksWithoutServer,
-               boolean cacheHasAccessToFs) {
+               boolean cacheHasAccessToFs) throws IOException {
                // First create two BLOBs and upload them to BLOB server
                final byte[] buf = new byte[128];
                final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
 
                BlobServer blobServer = null;
                BlobCache blobCache = null;
+               BlobStoreService blobStoreService = null;
                try {
+                       final Configuration cacheConfig;
+                       if (cacheHasAccessToFs) {
+                               cacheConfig = config;
+                       } else {
+                               // just in case parameters are still read from 
the server,
+                               // create a separate configuration object for 
the cache
+                               cacheConfig = new Configuration(config);
+                               
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                                       temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
+                       }
+
+                       blobStoreService = 
BlobUtils.createBlobStoreFromConfig(cacheConfig);
 
                        // Start the BLOB server
-                       blobServer = new BlobServer(config);
+                       blobServer = new BlobServer(config, blobStoreService);
                        final InetSocketAddress serverAddress = new 
InetSocketAddress(blobServer.getPort());
 
                        // Upload BLOBs
@@ -112,22 +126,11 @@ public class BlobCacheSuccessTest {
 
                        if (cacheWorksWithoutServer) {
                                // Now, shut down the BLOB server, the BLOBs 
must still be accessible through the cache.
-                               blobServer.shutdown();
+                               blobServer.close();
                                blobServer = null;
                        }
 
-                       final Configuration cacheConfig;
-                       if (cacheHasAccessToFs) {
-                               cacheConfig = config;
-                       } else {
-                               // just in case parameters are still read from 
the server,
-                               // create a separate configuration object for 
the cache
-                               cacheConfig = new Configuration(config);
-                               
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                                       temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
-                       }
-
-                       blobCache = new BlobCache(serverAddress, cacheConfig);
+                       blobCache = new BlobCache(serverAddress, cacheConfig, 
blobStoreService);
 
                        for (BlobKey blobKey : blobKeys) {
                                blobCache.getURL(blobKey);
@@ -135,7 +138,7 @@ public class BlobCacheSuccessTest {
 
                        if (blobServer != null) {
                                // Now, shut down the BLOB server, the BLOBs 
must still be accessible through the cache.
-                               blobServer.shutdown();
+                               blobServer.close();
                                blobServer = null;
                        }
 
@@ -162,18 +165,17 @@ public class BlobCacheSuccessTest {
                                        fail(e.getMessage());
                                }
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (blobServer != null) {
-                               blobServer.shutdown();
+                               blobServer.close();
                        }
 
                        if(blobCache != null){
-                               blobCache.shutdown();
+                               blobCache.close();
+                       }
+
+                       if (blobStoreService != null) {
+                               blobStoreService.closeAndCleanupAllData();
                        }
                }
        }

Reply via email to