http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index 27603d0..f9052e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -43,7 +44,7 @@ import org.junit.Test;
 /**
  * This class contains unit tests for the {@link BlobClient} with ssl enabled.
  */
-public class BlobClientSslTest {
+public class BlobClientSslTest extends TestLogger {
 
        /** The buffer size used during the tests in bytes. */
        private static final int TEST_BUFFER_SIZE = 17 * 1000;
@@ -64,19 +65,14 @@ public class BlobClientSslTest {
         * Starts the SSL enabled BLOB server.
         */
        @BeforeClass
-       public static void startSSLServer() {
-               try {
-                       Configuration config = new Configuration();
-                       config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-                       config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
-                       
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-                       
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-                       BLOB_SSL_SERVER = new BlobServer(config);
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+       public static void startSSLServer() throws IOException {
+               Configuration config = new Configuration();
+               config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+               config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+               config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"password");
+               BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore());
+
 
                sslClientConfig = new Configuration();
                
sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
@@ -88,20 +84,14 @@ public class BlobClientSslTest {
         * Starts the SSL disabled BLOB server.
         */
        @BeforeClass
-       public static void startNonSSLServer() {
-               try {
-                       Configuration config = new Configuration();
-                       config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
-                       config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-                       config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
-                       
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
-                       
config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password");
-                       BLOB_SERVER = new BlobServer(config);
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+       public static void startNonSSLServer() throws IOException {
+               Configuration config = new Configuration();
+               config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true);
+               config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
+               config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
+               
config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password");
+               config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, 
"password");
+               BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
 
                clientConfig = new Configuration();
                clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, 
true);
@@ -114,13 +104,13 @@ public class BlobClientSslTest {
         * Shuts the BLOB server down.
         */
        @AfterClass
-       public static void stopServers() {
+       public static void stopServers() throws IOException {
                if (BLOB_SSL_SERVER != null) {
-                       BLOB_SSL_SERVER.shutdown();
+                       BLOB_SSL_SERVER.close();
                }
 
                if (BLOB_SERVER != null) {
-                       BLOB_SERVER.shutdown();
+                       BLOB_SERVER.close();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 8f8f8c5..fda4ee9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -57,24 +57,18 @@ public class BlobClientTest {
         * Starts the BLOB server.
         */
        @BeforeClass
-       public static void startServer() {
-               try {
-                       blobServiceConfig = new Configuration();
-                       BLOB_SERVER = new BlobServer(blobServiceConfig);
-               }
-               catch (IOException e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
+       public static void startServer() throws IOException {
+               blobServiceConfig = new Configuration();
+               BLOB_SERVER = new BlobServer(blobServiceConfig, new 
VoidBlobStore());
        }
 
        /**
         * Shuts the BLOB server down.
         */
        @AfterClass
-       public static void stopServer() {
+       public static void stopServer() throws IOException {
                if (BLOB_SERVER != null) {
-                       BLOB_SERVER.shutdown();
+                       BLOB_SERVER.close();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index f8d50d5..4f12ddb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -30,16 +30,13 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -59,10 +56,20 @@ public class BlobRecoveryITCase extends TestLogger {
                config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getPath());
 
-               testBlobServerRecovery(config);
+               BlobStoreService blobStoreService = null;
+
+               try {
+                       blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
+                       testBlobServerRecovery(config, blobStoreService);
+               } finally {
+                       if (blobStoreService != null) {
+                               blobStoreService.closeAndCleanupAllData();
+                       }
+               }
        }
 
-       public static void testBlobServerRecovery(final Configuration config) 
throws IOException {
+       public static void testBlobServerRecovery(final Configuration config, 
final BlobStore blobStore) throws IOException {
                final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
                String storagePath = 
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId;
                Random rand = new Random();
@@ -73,7 +80,7 @@ public class BlobRecoveryITCase extends TestLogger {
 
                try {
                        for (int i = 0; i < server.length; i++) {
-                               server[i] = new BlobServer(config);
+                               server[i] = new BlobServer(config, blobStore);
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
                        }
 
@@ -166,7 +173,7 @@ public class BlobRecoveryITCase extends TestLogger {
                finally {
                        for (BlobServer s : server) {
                                if (s != null) {
-                                       s.shutdown();
+                                       s.close();
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 025a2ff..e8e28a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -44,10 +44,11 @@ public class BlobServerDeleteTest {
        public void testDeleteSingle() {
                BlobServer server = null;
                BlobClient client = null;
+               BlobStore blobStore = new VoidBlobStore();
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -93,10 +94,11 @@ public class BlobServerDeleteTest {
        public void testDeleteAll() {
                BlobServer server = null;
                BlobClient client = null;
+               BlobStore blobStore = new VoidBlobStore();
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -156,10 +158,11 @@ public class BlobServerDeleteTest {
        public void testDeleteAlreadyDeletedByBlobKey() {
                BlobServer server = null;
                BlobClient client = null;
+               BlobStore blobStore = new VoidBlobStore();
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -195,10 +198,11 @@ public class BlobServerDeleteTest {
        public void testDeleteAlreadyDeletedByName() {
                BlobServer server = null;
                BlobClient client = null;
+               BlobStore blobStore = new VoidBlobStore();
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -237,10 +241,11 @@ public class BlobServerDeleteTest {
 
                BlobServer server = null;
                BlobClient client = null;
+               BlobStore blobStore = new VoidBlobStore();
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -289,7 +294,11 @@ public class BlobServerDeleteTest {
                        }
                }
                if (server != null) {
-                       server.shutdown();
+                       try {
+                               server.close();
+                       } catch (IOException e) {
+                               e.printStackTrace();
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 59a62e1..6d1dba8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -40,13 +40,13 @@ public class BlobServerGetTest {
        private final Random rnd = new Random();
 
        @Test
-       public void testGetFailsDuringLookup() {
+       public void testGetFailsDuringLookup() throws IOException {
                BlobServer server = null;
                BlobClient client = null;
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -66,37 +66,27 @@ public class BlobServerGetTest {
                        try {
                                client.get(key);
                                fail("This should not succeed.");
-                       }
-                       catch (IOException e) {
+                       } catch (IOException e) {
                                // expected
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
+                               client.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }
 
        @Test
-       public void testGetFailsDuringStreaming() {
+       public void testGetFailsDuringStreaming() throws IOException {
                BlobServer server = null;
                BlobClient client = null;
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -129,21 +119,12 @@ public class BlobServerGetTest {
                        catch (IOException e) {
                                // expected
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
+                               client.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index c4d6d1c..441ca7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -42,13 +42,13 @@ public class BlobServerPutTest {
        private final Random rnd = new Random();
 
        @Test
-       public void testPutBufferSuccessful() {
+       public void testPutBufferSuccessful() throws IOException {
                BlobServer server = null;
                BlobClient client = null;
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -95,34 +95,25 @@ public class BlobServerPutTest {
                        BlobUtils.readFully(is3, result3, 0, result3.length, 
null);
                        is3.close();
                        assertArrayEquals(data, result3);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
+                               client.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }
 
 
        @Test
-       public void testPutStreamSuccessful() {
+       public void testPutStreamSuccessful() throws IOException {
                BlobServer server = null;
                BlobClient client = null;
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -143,12 +134,7 @@ public class BlobServerPutTest {
                                String stringKey = "my test key";
                                client.put(jid, stringKey, new 
ByteArrayInputStream(data));
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (client != null) {
                                try {
                                        client.close();
@@ -157,19 +143,19 @@ public class BlobServerPutTest {
                                }
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }
 
        @Test
-       public void testPutChunkedStreamSuccessful() {
+       public void testPutChunkedStreamSuccessful() throws IOException {
                BlobServer server = null;
                BlobClient client = null;
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        client = new BlobClient(serverAddress, config);
@@ -190,27 +176,18 @@ public class BlobServerPutTest {
                                String stringKey = "my test key";
                                client.put(jid, stringKey, new 
ChunkedInputStream(data, 17));
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
+                               client.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }
 
        @Test
-       public void testPutBufferFails() {
+       public void testPutBufferFails() throws IOException {
                assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
                BlobServer server = null;
@@ -219,7 +196,7 @@ public class BlobServerPutTest {
                File tempFileDir = null;
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        // make sure the blob server cannot create any files in 
its storage dir
                        tempFileDir = 
server.createTemporaryFilename().getParentFile().getParentFile();
@@ -250,31 +227,22 @@ public class BlobServerPutTest {
                                // expected
                        }
 
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        // set writable again to make sure we can remove the 
directory
                        if (tempFileDir != null) {
                                tempFileDir.setWritable(true, false);
                        }
                        if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
+                               client.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }
 
        @Test
-       public void testPutNamedBufferFails() {
+       public void testPutNamedBufferFails() throws IOException {
                assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
                BlobServer server = null;
@@ -283,7 +251,7 @@ public class BlobServerPutTest {
                File tempFileDir = null;
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
 
                        // make sure the blob server cannot create any files in 
its storage dir
                        tempFileDir = 
server.createTemporaryFilename().getParentFile().getParentFile();
@@ -317,25 +285,16 @@ public class BlobServerPutTest {
                        catch (IllegalStateException e) {
                                // expected
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        // set writable again to make sure we can remove the 
directory
                        if (tempFileDir != null) {
                                tempFileDir.setWritable(true, false);
                        }
                        if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {
-                                       t.printStackTrace();
-                               }
+                               client.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index c3762aa..120d86a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger {
        public void testOnEphemeralPort() throws IOException {
                Configuration conf = new Configuration();
                conf.setString(BlobServerOptions.PORT, "0");
-               BlobServer srv = new BlobServer(conf);
-               srv.shutdown();
+               BlobServer srv = new BlobServer(conf, new VoidBlobStore());
+               srv.close();
        }
 
        /**
@@ -63,7 +63,7 @@ public class BlobServerRangeTest extends TestLogger {
 
                // this thing is going to throw an exception
                try {
-                       BlobServer srv = new BlobServer(conf);
+                       BlobServer srv = new BlobServer(conf, new 
VoidBlobStore());
                } finally {
                        socket.close();
                }
@@ -92,9 +92,9 @@ public class BlobServerRangeTest extends TestLogger {
 
                // this thing is going to throw an exception
                try {
-                       BlobServer srv = new BlobServer(conf);
+                       BlobServer srv = new BlobServer(conf, new 
VoidBlobStore());
                        Assert.assertEquals(availablePort, srv.getPort());
-                       srv.shutdown();
+                       srv.close();
                } finally {
                        sockets[0].close();
                        sockets[1].close();

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
index 93f9b73..91e119b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer {
 
        private int numFailures;
 
-       public TestingFailingBlobServer(Configuration config, int numFailures) 
throws IOException {
-               super(config);
+       public TestingFailingBlobServer(Configuration config, BlobStore 
blobStore, int numFailures) throws IOException {
+               super(config, blobStore);
                this.numFailures = numFailures;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5d9ade3..98e6b3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
@@ -45,7 +44,7 @@ import java.util.List;
 public class BlobLibraryCacheManagerTest {
 
        @Test
-       public void testLibraryCacheManagerCleanup() {
+       public void testLibraryCacheManagerCleanup() throws IOException, 
InterruptedException {
 
                JobID jid = new JobID();
                List<BlobKey> keys = new ArrayList<BlobKey>();
@@ -56,7 +55,7 @@ public class BlobLibraryCacheManagerTest {
 
                try {
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
                        InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
                        BlobClient bc = new BlobClient(blobSocketAddress, 
config);
 
@@ -108,14 +107,9 @@ public class BlobLibraryCacheManagerTest {
                        assertEquals(2, caughtExceptions);
 
                        bc.close();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
 
                        if (libraryCacheManager != null) {
@@ -130,7 +124,7 @@ public class BlobLibraryCacheManagerTest {
        }
 
        @Test
-       public void testRegisterAndDownload() {
+       public void testRegisterAndDownload() throws IOException {
                assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't 
work on Windows.
 
                BlobServer server = null;
@@ -139,9 +133,9 @@ public class BlobLibraryCacheManagerTest {
                try {
                        // create the blob transfer services
                        Configuration config = new Configuration();
-                       server = new BlobServer(config);
+                       server = new BlobServer(config, new VoidBlobStore());
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
-                       cache = new BlobCache(serverAddress, config);
+                       cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());
 
                        // upload some meaningless data to the server
                        BlobClient uploader = new BlobClient(serverAddress, 
config);
@@ -210,22 +204,17 @@ public class BlobLibraryCacheManagerTest {
                        catch (IOException e) {
                                // splendid!
                        }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
+               } finally {
                        if (cacheDir != null) {
                                if (!cacheDir.setWritable(true, false)) {
                                        System.err.println("Could not re-add 
write permissions to cache directory.");
                                }
                        }
                        if (cache != null) {
-                               cache.shutdown();
+                               cache.close();
                        }
                        if (server != null) {
-                               server.shutdown();
+                               server.close();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 54e1a9b..16e3a05 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.TestLogger;
@@ -63,6 +65,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                BlobLibraryCacheManager[] libServer = new 
BlobLibraryCacheManager[2];
                BlobCache cache = null;
                BlobLibraryCacheManager libCache = null;
+               BlobStoreService blobStoreService = null;
 
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
@@ -70,8 +73,10 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
 
                try {
+                       blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
+
                        for (int i = 0; i < server.length; i++) {
-                               server[i] = new BlobServer(config);
+                               server[i] = new BlobServer(config, 
blobStoreService);
                                serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
                                libServer[i] = new 
BlobLibraryCacheManager(server[i], 3600 * 1000);
                        }
@@ -89,7 +94,7 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        }
 
                        // The cache
-                       cache = new BlobCache(serverAddress[0], config);
+                       cache = new BlobCache(serverAddress[0], config, 
blobStoreService);
                        libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
 
                        // Register uploaded libraries
@@ -110,10 +115,10 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                        }
 
                        // Shutdown cache and start with other server
-                       cache.shutdown();
+                       cache.close();
                        libCache.shutdown();
 
-                       cache = new BlobCache(serverAddress[1], config);
+                       cache = new BlobCache(serverAddress[1], config, 
blobStoreService);
                        libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
 
                        // Verify key 1
@@ -156,17 +161,21 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                finally {
                        for (BlobServer s : server) {
                                if (s != null) {
-                                       s.shutdown();
+                                       s.close();
                                }
                        }
 
                        if (cache != null) {
-                               cache.shutdown();
+                               cache.close();
                        }
 
                        if (libCache != null) {
                                libCache.shutdown();
                        }
+
+                       if (blobStoreService != null) {
+                               blobStoreService.closeAndCleanupAllData();
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
index 06ffe3c..d89093d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java
@@ -22,11 +22,11 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import 
org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
-import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -62,7 +62,10 @@ public class ZooKeeperRegistryTest extends TestLogger {
                configuration.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                final HighAvailabilityServices zkHaService = new 
ZooKeeperHaServices(
-                               
ZooKeeperUtils.startCuratorFramework(configuration), 
Executors.directExecutor(), configuration);
+                               
ZooKeeperUtils.startCuratorFramework(configuration),
+                       Executors.directExecutor(),
+                       configuration,
+                       new VoidBlobStore());
 
                final RunningJobsRegistry zkRegistry = 
zkHaService.getRunningJobsRegistry();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index b8b5984..a63b02d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -71,7 +70,6 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -102,7 +100,6 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.runtime.BoxedUnit;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -190,7 +187,11 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                TestingUtils.defaultExecutor(),
                                instanceManager,
                                scheduler,
-                               new BlobLibraryCacheManager(new 
BlobServer(flinkConfiguration), 3600000),
+                               new BlobLibraryCacheManager(
+                                       new BlobServer(
+                                               flinkConfiguration,
+                                               
testingHighAvailabilityServices.createBlobStore()),
+                                       3600000L),
                                archive,
                                new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
                                timeout,

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index d6257ba..70800e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -184,7 +185,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                        TestingUtils.defaultExecutor(),
                        new InstanceManager(),
                        new Scheduler(TestingUtils.defaultExecutionContext()),
-                       new BlobLibraryCacheManager(new 
BlobServer(configuration), 10L),
+                       new BlobLibraryCacheManager(new 
BlobServer(configuration, new VoidBlobStore()), 10L),
                        ActorRef.noSender(),
                        new NoRestartStrategy.NoRestartStrategyFactory(),
                        AkkaUtils.getDefaultTimeoutAsFiniteDuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
index 0ea47f2..0282a4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java
@@ -18,17 +18,21 @@
 
 package org.apache.flink.runtime.leaderelection;
 
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -64,10 +68,13 @@ public class ZooKeeperLeaderRetrievalTest extends 
TestLogger{
                config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
 
-               highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                       config,
+               CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(config);
+
+               highAvailabilityServices = new ZooKeeperHaServices(
+                       client,
                        TestingUtils.defaultExecutor(),
-                       
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
+                       config,
+                       new VoidBlobStore());
        }
 
        @After

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 58f2231..d6fc48c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -97,7 +97,7 @@ public class TaskManagerMetricsTest extends TestLogger {
                                taskManagerServices.getMemoryManager(),
                                taskManagerServices.getIOManager(),
                                taskManagerServices.getNetworkEnvironment(),
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+                               highAvailabilityServices,
                                tmRegistry);
 
                        final ActorRef taskManager = 
actorSystem.actorOf(tmProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 2a4c036..9dcfc70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -149,9 +149,6 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
 
                        network.start();
 
-                       LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(
-                               HighAvailabilityServices.DEFAULT_JOB_ID);
-
                        MetricRegistryConfiguration metricRegistryConfiguration 
= MetricRegistryConfiguration.fromConfiguration(config);
 
                        // create the task manager
@@ -164,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest 
extends TestLogger {
                                ioManager,
                                network,
                                numberOfSlots,
-                               leaderRetrievalService,
+                               highAvailabilityServices,
                                new 
MetricRegistry(metricRegistryConfiguration));
 
                        taskManager = actorSystem.actorOf(tmProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 92de31a..0844aad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -57,6 +58,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -601,7 +603,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
        }
 
        @Test
-       public void testCheckForValidRegistrationSessionIDs() {
+       public void testCheckForValidRegistrationSessionIDs() throws 
IOException {
                new JavaTestKit(actorSystem) {{
 
                        ActorGateway taskManagerGateway = null;
@@ -612,6 +614,7 @@ public class TaskManagerRegistrationTest extends TestLogger 
{
                        HighAvailabilityServices mockedHighAvailabilityServices 
= mock(HighAvailabilityServices.class);
                        
when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID)))
                                .thenReturn(new 
StandaloneLeaderRetrievalService(getTestActor().path().toString(), 
trueLeaderSessionID));
+                       
when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new 
VoidBlobStore());
 
                        try {
                                // we make the test actor (the test kit) the 
JobManager to intercept

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties 
b/flink-runtime/src/test/resources/log4j-test.properties
index 7ba1633..98f136a 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=OFF, console
+log4j.rootLogger=INFO, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
index 4be3299..1b9ee48 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala
@@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll with Befor
     val components = JobManager.createJobManagerComponents(
       config,
       executor,
-      executor)
+      executor,
+      highAvailabilityServices.createBlobStore())
 
     // Start the JobManager without a MetricRegistry so that we don't start 
the MetricQueryService.
     // The problem of the MetricQueryService is that it starts an actor with a 
fixed name. Thus,

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 09dc5ed..1db0a85 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.testingUtils
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
@@ -32,15 +32,15 @@ import scala.language.postfixOps
 /** Subclass of the [[TaskManager]] to support testing messages
  */
 class TestingTaskManager(
-                          config: TaskManagerConfiguration,
-                          resourceID: ResourceID,
-                          connectionInfo: TaskManagerLocation,
-                          memoryManager: MemoryManager,
-                          ioManager: IOManager,
-                          network: NetworkEnvironment,
-                          numberOfSlots: Int,
-                          leaderRetrievalService: LeaderRetrievalService,
-                          metricRegistry : MetricRegistry)
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
     resourceID,
@@ -49,19 +49,19 @@ class TestingTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry)
   with TestingTaskManagerLike {
 
   def this(
-            config: TaskManagerConfiguration,
-            connectionInfo: TaskManagerLocation,
-            memoryManager: MemoryManager,
-            ioManager: IOManager,
-            network: NetworkEnvironment,
-            numberOfSlots: Int,
-            leaderRetrievalService: LeaderRetrievalService,
-            metricRegistry : MetricRegistry) {
+    config: TaskManagerConfiguration,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry) {
     this(
       config,
       ResourceID.generate(),
@@ -70,7 +70,7 @@ class TestingTaskManager(
       ioManager,
       network,
       numberOfSlots,
-      leaderRetrievalService,
+      highAvailabilityServices,
       metricRegistry)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 5f9d178..2983d66 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -155,6 +155,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase 
extends TestLogger {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkQuorum);
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
FileStateBackendBasePath.getAbsolutePath());
 
                ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
                                "leader", 1, config);

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
index 0f82faa..1df4b8d 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
@@ -40,19 +41,19 @@ import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
   * @param ioManager IOManager responsible for I/O
   * @param network NetworkEnvironment for this actor
   * @param numberOfSlots Number of slots for this TaskManager
-  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the 
current leading
-  *                              JobManager
+  * @param highAvailabilityServices [[HighAvailabilityServices]] to create a 
leader retrieval
+  *                                service for retrieving the leading 
JobManager
   */
 class TestingYarnTaskManager(
-                              config: TaskManagerConfiguration,
-                              resourceID: ResourceID,
-                              connectionInfo: TaskManagerLocation,
-                              memoryManager: MemoryManager,
-                              ioManager: IOManager,
-                              network: NetworkEnvironment,
-                              numberOfSlots: Int,
-                              leaderRetrievalService: LeaderRetrievalService,
-                              metricRegistry : MetricRegistry)
+    config: TaskManagerConfiguration,
+    resourceID: ResourceID,
+    connectionInfo: TaskManagerLocation,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    highAvailabilityServices: HighAvailabilityServices,
+    metricRegistry : MetricRegistry)
   extends YarnTaskManager(
     config,
     resourceID,
@@ -61,7 +62,7 @@ class TestingYarnTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry)
   with TestingTaskManagerLike {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
index e9c3904..f81d040 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.blob.FileSystemBlobStore;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements 
HighAvailabilitySe
         * HA services clean up */
        protected final Path haDataDirectory;
 
+       /** Blob store service to be used for the BlobServer and BlobCache */
+       protected final BlobStoreService blobStoreService;
+
        /** Flag marking this instance as shut down */
        private volatile boolean closed;
 
@@ -153,6 +157,8 @@ public abstract class YarnHighAvailabilityServices 
implements HighAvailabilitySe
                }
 
                LOG.info("Flink YARN application will store recovery data at 
{}", haDataDirectory);
+
+               blobStoreService = new FileSystemBlobStore(flinkFileSystem, 
haDataDirectory.toString());
        }
 
        // 
------------------------------------------------------------------------
@@ -163,7 +169,7 @@ public abstract class YarnHighAvailabilityServices 
implements HighAvailabilitySe
        public BlobStore createBlobStore() throws IOException {
                enter();
                try {
-                       return new FileSystemBlobStore(flinkFileSystem, 
haDataDirectory.toString());
+                       return blobStoreService;
                } finally {
                        exit();
                }
@@ -192,11 +198,23 @@ public abstract class YarnHighAvailabilityServices 
implements HighAvailabilitySe
                        }
                        closed = true;
 
+                       Throwable exception = null;
+
+                       try {
+                               blobStoreService.close();
+                       } catch (Throwable t) {
+                               exception = t;
+                       }
+
                        // we do not propagate exceptions here, but only log 
them
                        try {
                                hadoopFileSystem.close();
                        } catch (Throwable t) {
-                               LOG.warn("Error closing Hadoop FileSystem", t);
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+                       }
+
+                       if (exception != null) {
+                               ExceptionUtils.rethrowException(exception, 
"Could not properly close the YarnHighAvailabilityServices.");
                        }
                }
                finally {
@@ -213,12 +231,18 @@ public abstract class YarnHighAvailabilityServices 
implements HighAvailabilitySe
                        // we remember exceptions only, then continue cleanup, 
and re-throw at the end
                        Throwable exception = null;
 
+                       try {
+                               blobStoreService.closeAndCleanupAllData();
+                       } catch (Throwable t) {
+                               exception = t;
+                       }
+
                        // first, we delete all data in Flink's data directory
                        try {
                                flinkFileSystem.delete(haDataDirectory, true);
                        }
                        catch (Throwable t) {
-                               exception = t;
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
                        }
 
                        // now we actually close the services

http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index be31085..b7f4c9a 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.yarn
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.metrics.MetricRegistry
@@ -38,7 +38,7 @@ class YarnTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService,
+    highAvailabilityServices: HighAvailabilityServices,
     metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
@@ -48,7 +48,7 @@ class YarnTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService,
+    highAvailabilityServices,
     metricRegistry) {
 
   override def handleMessage: Receive = {

Reply via email to