Repository: flink
Updated Branches:
  refs/heads/master 305098614 -> c26c2e7b5


[FLINK-7053][blob] Improve code quality in BlobServer related tests

This lets BlobClientSslTest extend BlobClientTest as most of its implementation
came from there and was simply copied.

[FLINK-7053][blob] verify some of the buffers returned by GET

[FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests

This replaces the use of some temporary directory where it is not guaranteed
that it will be deleted after the test.

This closes #4234.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c26c2e7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c26c2e7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c26c2e7b

Branch: refs/heads/master
Commit: c26c2e7b5c12fc2442446a6fb2d801eb87022c12
Parents: 3050986
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Jun 22 17:31:17 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Aug 7 12:01:24 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/hdfstests/HDFSTest.java    |   8 +
 .../runtime/blob/BlobCacheRetriesTest.java      |  11 +-
 .../runtime/blob/BlobCacheSuccessTest.java      |  68 ++++-
 .../flink/runtime/blob/BlobClientSslTest.java   | 256 +++++--------------
 .../flink/runtime/blob/BlobClientTest.java      |  80 ++++--
 .../flink/runtime/blob/BlobRecoveryITCase.java  |   4 +-
 .../runtime/blob/BlobServerDeleteTest.java      |  24 +-
 .../flink/runtime/blob/BlobServerGetTest.java   |  27 +-
 .../flink/runtime/blob/BlobServerPutTest.java   |  32 ++-
 .../flink/runtime/blob/BlobServerRangeTest.java |  10 +
 .../flink/runtime/blob/BlobUtilsTest.java       |   6 +-
 .../BlobLibraryCacheManagerTest.java            |  15 ++
 .../BlobLibraryCacheRecoveryITCase.java         |   9 +-
 13 files changed, 296 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
index c490c9f..f70845b 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -48,7 +49,9 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
@@ -70,6 +73,9 @@ public class HDFSTest {
        private org.apache.hadoop.fs.Path hdPath;
        protected org.apache.hadoop.fs.FileSystem hdfs;
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        @BeforeClass
        public static void verifyOS() {
                Assume.assumeTrue("HDFS cluster cannot be started on Windows 
without extensions.", !OperatingSystem.isWindows());
@@ -242,6 +248,8 @@ public class HDFSTest {
                        config = new 
org.apache.flink.configuration.Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
hdfsURI);
 
                BlobStoreService blobStoreService = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
index 1cf77ea..366b592 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.junit.Rule;
@@ -45,6 +46,8 @@ public class BlobCacheRetriesTest {
        @Test
        public void testBlobFetchRetries() throws IOException {
                final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
 
                testBlobFetchRetries(config, new VoidBlobStore());
        }
@@ -56,9 +59,11 @@ public class BlobCacheRetriesTest {
        @Test
        public void testBlobFetchRetriesHa() throws IOException {
                final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                       temporaryFolder.getRoot().getPath());
+                       temporaryFolder.newFolder().getPath());
 
                BlobStoreService blobStoreService = null;
 
@@ -136,6 +141,8 @@ public class BlobCacheRetriesTest {
        @Test
        public void testBlobFetchWithTooManyFailures() throws IOException {
                final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
 
                testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
        }
@@ -147,6 +154,8 @@ public class BlobCacheRetriesTest {
        @Test
        public void testBlobFetchWithTooManyFailuresHa() throws IOException {
                final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
                        temporaryFolder.getRoot().getPath());

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
index 2a65a3b..51be1b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.util.Preconditions;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -52,6 +55,9 @@ public class BlobCacheSuccessTest {
        @Test
        public void testBlobCache() throws IOException {
                Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+
                uploadFileGetTest(config, false, false);
        }
 
@@ -63,27 +69,63 @@ public class BlobCacheSuccessTest {
        @Test
        public void testBlobCacheHa() throws IOException {
                Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                       temporaryFolder.getRoot().getPath());
+                       temporaryFolder.newFolder().getPath());
                uploadFileGetTest(config, true, true);
        }
 
        /**
+        * BlobCache is configured in HA mode and the cache can download files 
from
+        * the file system directly and does not need to download BLOBs from the
+        * BlobServer.
+        */
+       @Test
+       public void testBlobCacheHa2() throws IOException {
+               Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+               config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                       temporaryFolder.newFolder().getPath());
+               uploadFileGetTest(config, false, true);
+       }
+
+       /**
         * BlobCache is configured in HA mode but the cache itself cannot 
access the
         * file system and thus needs to download BLOBs from the BlobServer.
         */
        @Test
        public void testBlobCacheHaFallback() throws IOException {
                Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                       temporaryFolder.getRoot().getPath());
+                       temporaryFolder.newFolder().getPath());
                uploadFileGetTest(config, false, false);
        }
 
-       private void uploadFileGetTest(final Configuration config, boolean 
cacheWorksWithoutServer,
-               boolean cacheHasAccessToFs) throws IOException {
+       /**
+        * Uploads two different BLOBs to the {@link BlobServer} via a {@link 
BlobClient} and verifies
+        * we can access the files from a {@link BlobCache}.
+        *
+        * @param config
+        *              configuration to use for the server and cache (the 
final cache's configuration will
+        *              actually get some modifications)
+        * @param shutdownServerAfterUpload
+        *              whether the server should be shut down after uploading 
the BLOBs (only useful with HA mode)
+        *              - this implies that the cache has access to the shared 
<tt>HA_STORAGE_PATH</tt>
+        * @param cacheHasAccessToFs
+        *              whether the cache should have access to a shared 
<tt>HA_STORAGE_PATH</tt> (only useful with
+        *              HA mode)
+        */
+       private void uploadFileGetTest(final Configuration config, boolean 
shutdownServerAfterUpload,
+                       boolean cacheHasAccessToFs) throws IOException {
+               Preconditions.checkArgument(!shutdownServerAfterUpload || 
cacheHasAccessToFs);
+
                // First create two BLOBs and upload them to BLOB server
                final byte[] buf = new byte[128];
                final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
@@ -92,15 +134,15 @@ public class BlobCacheSuccessTest {
                BlobCache blobCache = null;
                BlobStoreService blobStoreService = null;
                try {
-                       final Configuration cacheConfig;
-                       if (cacheHasAccessToFs) {
-                               cacheConfig = config;
-                       } else {
-                               // just in case parameters are still read from 
the server,
-                               // create a separate configuration object for 
the cache
-                               cacheConfig = new Configuration(config);
+                       final Configuration cacheConfig = new 
Configuration(config);
+                       
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+                       if (!cacheHasAccessToFs) {
+                               // make sure the cache cannot access the HA 
store directly
+                               
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                                       
temporaryFolder.newFolder().getAbsolutePath());
                                
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
-                                       temporaryFolder.getRoot().getPath() + 
"/does-not-exist");
+                                       temporaryFolder.newFolder().getPath() + 
"/does-not-exist");
                        }
 
                        blobStoreService = 
BlobUtils.createBlobStoreFromConfig(cacheConfig);
@@ -124,7 +166,7 @@ public class BlobCacheSuccessTest {
                                }
                        }
 
-                       if (cacheWorksWithoutServer) {
+                       if (shutdownServerAfterUpload) {
                                // Now, shut down the BLOB server, the BLOBs 
must still be accessible through the cache.
                                blobServer.close();
                                blobServer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
index a5189ea..790514c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java
@@ -18,48 +18,36 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetSocketAddress;
-import java.security.MessageDigest;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
 
 /**
  * This class contains unit tests for the {@link BlobClient} with ssl enabled.
  */
-public class BlobClientSslTest extends TestLogger {
-
-       /** The buffer size used during the tests in bytes. */
-       private static final int TEST_BUFFER_SIZE = 17 * 1000;
+public class BlobClientSslTest extends BlobClientTest {
 
        /** The instance of the SSL BLOB server used during the tests. */
        private static BlobServer BLOB_SSL_SERVER;
 
-       /** The SSL blob service client configuration */
+       /** Instance of a non-SSL BLOB server with SSL-enabled security 
options. */
+       private static BlobServer BLOB_NON_SSL_SERVER;
+
+       /** The SSL blob service client configuration. */
        private static Configuration sslClientConfig;
 
-       /** The instance of the non-SSL BLOB server used during the tests. */
-       private static BlobServer BLOB_SERVER;
+       /** The non-SSL blob service client configuration with SSL-enabled 
security options. */
+       private static Configuration nonSslClientConfig;
 
-       /** The non-ssl blob service client configuration */
-       private static Configuration clientConfig;
+       @ClassRule
+       public static TemporaryFolder temporarySslFolder = new 
TemporaryFolder();
 
        /**
         * Starts the SSL enabled BLOB server.
@@ -67,6 +55,8 @@ public class BlobClientSslTest extends TestLogger {
        @BeforeClass
        public static void startSSLServer() throws IOException {
                Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporarySslFolder.newFolder().getAbsolutePath());
                config.setBoolean(SecurityOptions.SSL_ENABLED, true);
                config.setString(SecurityOptions.SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
                config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, 
"password");
@@ -79,24 +69,23 @@ public class BlobClientSslTest extends TestLogger {
                
sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password");
        }
 
-       /**
-        * Starts the SSL disabled BLOB server.
-        */
        @BeforeClass
        public static void startNonSSLServer() throws IOException {
                Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporarySslFolder.newFolder().getAbsolutePath());
                config.setBoolean(SecurityOptions.SSL_ENABLED, true);
                config.setBoolean(BlobServerOptions.SSL_ENABLED, false);
                config.setString(SecurityOptions.SSL_KEYSTORE, 
"src/test/resources/local127.keystore");
                config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, 
"password");
                config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password");
-               BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
+               BLOB_NON_SSL_SERVER = new BlobServer(config, new 
VoidBlobStore());
 
-               clientConfig = new Configuration();
-               clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true);
-               clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false);
-               clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
-               clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
+               nonSslClientConfig = new Configuration();
+               nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, 
true);
+               nonSslClientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, 
false);
+               nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, 
"src/test/resources/local127.truststore");
+               
nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, 
"password");
        }
 
        /**
@@ -107,195 +96,92 @@ public class BlobClientSslTest extends TestLogger {
                if (BLOB_SSL_SERVER != null) {
                        BLOB_SSL_SERVER.close();
                }
-
-               if (BLOB_SERVER != null) {
-                       BLOB_SERVER.close();
+               if (BLOB_NON_SSL_SERVER != null) {
+                       BLOB_NON_SSL_SERVER.close();
                }
        }
 
-       /**
-        * Prepares a test file for the unit tests, i.e. the methods fills the 
file with a particular byte patterns and
-        * computes the file's BLOB key.
-        *
-        * @param file
-        *        the file to prepare for the unit tests
-        * @return the BLOB key of the prepared file
-        * @throws IOException
-        *         thrown if an I/O error occurs while writing to the test file
-        */
-       private static BlobKey prepareTestFile(File file) throws IOException {
-
-               MessageDigest md = BlobUtils.createMessageDigest();
-
-               final byte[] buf = new byte[TEST_BUFFER_SIZE];
-               for (int i = 0; i < buf.length; ++i) {
-                       buf[i] = (byte) (i % 128);
-               }
-
-               FileOutputStream fos = null;
-               try {
-                       fos = new FileOutputStream(file);
-
-                       for (int i = 0; i < 20; ++i) {
-                               fos.write(buf);
-                               md.update(buf);
-                       }
-
-               } finally {
-                       if (fos != null) {
-                               fos.close();
-                       }
-               }
+       protected Configuration getBlobClientConfig() {
+               return sslClientConfig;
+       }
 
-               return new BlobKey(md.digest());
+       protected BlobServer getBlobServer() {
+               return BLOB_SSL_SERVER;
        }
 
        /**
-        * Validates the result of a GET operation by comparing the data from 
the retrieved input stream to the content of
-        * the specified file.
-        *
-        * @param inputStream
-        *        the input stream returned from the GET operation
-        * @param file
-        *        the file to compare the input stream's data to
-        * @throws IOException
-        *         thrown if an I/O error occurs while reading the input stream 
or the file
+        * Verify ssl client to ssl server upload
         */
-       private static void validateGet(final InputStream inputStream, final 
File file) throws IOException {
-
-               InputStream inputStream2 = null;
-               try {
-
-                       inputStream2 = new FileInputStream(file);
-
-                       while (true) {
-
-                               final int r1 = inputStream.read();
-                               final int r2 = inputStream2.read();
-
-                               assertEquals(r2, r1);
-
-                               if (r1 < 0) {
-                                       break;
-                               }
-                       }
-
-               } finally {
-                       if (inputStream2 != null) {
-                               inputStream2.close();
-                       }
-               }
-
+       @Test
+       public void testUploadJarFilesHelper() throws Exception {
+               uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
        }
 
        /**
-        * Tests the PUT/GET operations for content-addressable streams.
+        * Verify ssl client to non-ssl server failure
         */
-       @Test
-       public void testContentAddressableStream() {
-
-               BlobClient client = null;
-               InputStream is = null;
-
-               try {
-                       File testFile = File.createTempFile("testfile", ".dat");
-                       testFile.deleteOnExit();
-
-                       BlobKey origKey = prepareTestFile(testFile);
-
-                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort());
-                       client = new BlobClient(serverAddress, sslClientConfig);
-
-                       // Store the data
-                       is = new FileInputStream(testFile);
-                       BlobKey receivedKey = client.put(is);
-                       assertEquals(origKey, receivedKey);
-
-                       is.close();
-                       is = null;
-
-                       // Retrieve the data
-                       is = client.get(receivedKey);
-                       validateGet(is, testFile);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       if (is != null) {
-                               try {
-                                       is.close();
-                               } catch (Throwable t) {}
-                       }
-                       if (client != null) {
-                               try {
-                                       client.close();
-                               } catch (Throwable t) {}
-                       }
-               }
+       @Test(expected = IOException.class)
+       public void testSSLClientFailure() throws Exception {
+               // SSL client connected to non-ssl server
+               uploadJarFile(BLOB_SERVER, sslClientConfig);
        }
 
        /**
-        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, List)} helper.
+        * Verify ssl client to non-ssl server failure
         */
-       private void uploadJarFile(BlobServer blobServer, Configuration 
blobClientConfig) throws Exception {
-               final File testFile = File.createTempFile("testfile", ".dat");
-               testFile.deleteOnExit();
-               prepareTestFile(testFile);
-
-               InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", blobServer.getPort());
-
-               List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
-                       Collections.singletonList(new Path(testFile.toURI())));
+       @Test(expected = IOException.class)
+       public void testSSLClientFailure2() throws Exception {
+               // SSL client connected to non-ssl server
+               uploadJarFile(BLOB_NON_SSL_SERVER, sslClientConfig);
+       }
 
-               assertEquals(1, blobKeys.size());
+       /**
+        * Verify non-ssl client to ssl server failure
+        */
+       @Test(expected = IOException.class)
+       public void testSSLServerFailure() throws Exception {
+               // Non-SSL client connected to ssl server
+               uploadJarFile(BLOB_SSL_SERVER, clientConfig);
+       }
 
-               try (BlobClient blobClient = new BlobClient(serverAddress, 
blobClientConfig)) {
-                       InputStream is = blobClient.get(blobKeys.get(0));
-                       validateGet(is, testFile);
-               }
+       /**
+        * Verify non-ssl client to ssl server failure
+        */
+       @Test(expected = IOException.class)
+       public void testSSLServerFailure2() throws Exception {
+               // Non-SSL client connected to ssl server
+               uploadJarFile(BLOB_SSL_SERVER, nonSslClientConfig);
        }
 
        /**
-        * Verify ssl client to ssl server upload
+        * Verify non-ssl connection sanity
         */
        @Test
-       public void testUploadJarFilesHelper() throws Exception {
-               uploadJarFile(BLOB_SSL_SERVER, sslClientConfig);
+       public void testNonSSLConnection() throws Exception {
+               uploadJarFile(BLOB_SERVER, clientConfig);
        }
 
        /**
-        * Verify ssl client to non-ssl server failure
+        * Verify non-ssl connection sanity
         */
        @Test
-       public void testSSLClientFailure() throws Exception {
-               try {
-                       uploadJarFile(BLOB_SERVER, sslClientConfig);
-                       fail("SSL client connected to non-ssl server");
-               } catch (Exception e) {
-                       // Exception expected
-               }
+       public void testNonSSLConnection2() throws Exception {
+               uploadJarFile(BLOB_SERVER, nonSslClientConfig);
        }
 
        /**
-        * Verify non-ssl client to ssl server failure
+        * Verify non-ssl connection sanity
         */
        @Test
-       public void testSSLServerFailure() throws Exception {
-               try {
-                       uploadJarFile(BLOB_SSL_SERVER, clientConfig);
-                       fail("Non-SSL client connected to ssl server");
-               } catch (Exception e) {
-                       // Exception expected
-               }
+       public void testNonSSLConnection3() throws Exception {
+               uploadJarFile(BLOB_NON_SSL_SERVER, clientConfig);
        }
 
        /**
         * Verify non-ssl connection sanity
         */
        @Test
-       public void testNonSSLConnection() throws Exception {
-               uploadJarFile(BLOB_SERVER, clientConfig);
+       public void testNonSSLConnection4() throws Exception {
+               uploadJarFile(BLOB_NON_SSL_SERVER, nonSslClientConfig);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 0a8f738..2932f41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.blob;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.EOFException;
 import java.io.File;
@@ -32,12 +38,9 @@ import java.security.MessageDigest;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.Path;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * This class contains unit tests for the {@link BlobClient}.
@@ -47,19 +50,27 @@ public class BlobClientTest {
        /** The buffer size used during the tests in bytes. */
        private static final int TEST_BUFFER_SIZE = 17 * 1000;
 
-       /** The instance of the BLOB server used during the tests. */
-       private static BlobServer BLOB_SERVER;
+       /** The instance of the (non-ssl) BLOB server used during the tests. */
+       static BlobServer BLOB_SERVER;
+
+       /** The blob service (non-ssl) client configuration */
+       static Configuration clientConfig;
 
-       /** The blob service client and server configuration */
-       private static Configuration blobServiceConfig;
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        /**
         * Starts the BLOB server.
         */
        @BeforeClass
        public static void startServer() throws IOException {
-               blobServiceConfig = new Configuration();
-               BLOB_SERVER = new BlobServer(blobServiceConfig, new 
VoidBlobStore());
+               Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+
+               BLOB_SERVER = new BlobServer(config, new VoidBlobStore());
+
+               clientConfig = new Configuration();
        }
 
        /**
@@ -134,19 +145,21 @@ public class BlobClientTest {
         *         thrown if an I/O error occurs while reading the input stream
         */
        private static void validateGet(final InputStream inputStream, final 
byte[] buf) throws IOException {
+               byte[] receivedBuffer = new byte[buf.length];
 
                int bytesReceived = 0;
 
                while (true) {
 
-                       final int read = inputStream.read(buf, bytesReceived, 
buf.length - bytesReceived);
+                       final int read = inputStream.read(receivedBuffer, 
bytesReceived, receivedBuffer.length - bytesReceived);
                        if (read < 0) {
                                throw new EOFException();
                        }
                        bytesReceived += read;
 
-                       if (bytesReceived == buf.length) {
+                       if (bytesReceived == receivedBuffer.length) {
                                assertEquals(-1, inputStream.read());
+                               assertArrayEquals(buf, receivedBuffer);
                                return;
                        }
                }
@@ -204,8 +217,8 @@ public class BlobClientTest {
                        md.update(testBuffer);
                        BlobKey origKey = new BlobKey(md.digest());
 
-                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
-                       client = new BlobClient(serverAddress, 
blobServiceConfig);
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+                       client = new BlobClient(serverAddress, 
getBlobClientConfig());
 
                        // Store the data
                        BlobKey receivedKey = client.put(testBuffer);
@@ -232,11 +245,19 @@ public class BlobClientTest {
                        if (client != null) {
                                try {
                                        client.close();
-                               } catch (Throwable t) {}
+                               } catch (Throwable ignored) {}
                        }
                }
        }
 
+       protected Configuration getBlobClientConfig() {
+               return clientConfig;
+       }
+
+       protected BlobServer getBlobServer() {
+               return BLOB_SERVER;
+       }
+
        /**
         * Tests the PUT/GET operations for content-addressable streams.
         */
@@ -252,8 +273,8 @@ public class BlobClientTest {
 
                        BlobKey origKey = prepareTestFile(testFile);
 
-                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
-                       client = new BlobClient(serverAddress, 
blobServiceConfig);
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+                       client = new BlobClient(serverAddress, 
getBlobClientConfig());
 
                        // Store the data
                        is = new FileInputStream(testFile);
@@ -275,12 +296,12 @@ public class BlobClientTest {
                        if (is != null) {
                                try {
                                        is.close();
-                               } catch (Throwable t) {}
+                               } catch (Throwable ignored) {}
                        }
                        if (client != null) {
                                try {
                                        client.close();
-                               } catch (Throwable t) {}
+                               } catch (Throwable ignored) {}
                        }
                }
        }
@@ -290,18 +311,25 @@ public class BlobClientTest {
         */
        @Test
        public void testUploadJarFilesHelper() throws Exception {
+               uploadJarFile(getBlobServer(), getBlobClientConfig());
+       }
+
+       /**
+        * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, List)} helper.
+        */
+       static void uploadJarFile(BlobServer blobServer, Configuration 
blobClientConfig) throws Exception {
                final File testFile = File.createTempFile("testfile", ".dat");
                testFile.deleteOnExit();
                prepareTestFile(testFile);
 
-               InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
+               InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", blobServer.getPort());
 
-               List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, blobServiceConfig,
+               List<BlobKey> blobKeys = 
BlobClient.uploadJarFiles(serverAddress, blobClientConfig,
                        Collections.singletonList(new Path(testFile.toURI())));
 
                assertEquals(1, blobKeys.size());
 
-               try (BlobClient blobClient = new BlobClient(serverAddress, 
blobServiceConfig)) {
+               try (BlobClient blobClient = new BlobClient(serverAddress, 
blobClientConfig)) {
                        InputStream is = blobClient.get(blobKeys.get(0));
                        validateGet(is, testFile);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index c2a3a7a..3c7711d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -54,7 +55,8 @@ public class BlobRecoveryITCase extends TestLogger {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
-               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getPath());
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().getPath());
 
                BlobStoreService blobStoreService = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 22271af..5db9568 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
@@ -51,6 +54,9 @@ public class BlobServerDeleteTest extends TestLogger {
 
        private final Random rnd = new Random();
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        @Test
        public void testDeleteSingleByBlobKey() {
                BlobServer server = null;
@@ -58,7 +64,9 @@ public class BlobServerDeleteTest extends TestLogger {
                BlobStore blobStore = new VoidBlobStore();
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -124,7 +132,9 @@ public class BlobServerDeleteTest extends TestLogger {
                BlobStore blobStore = new VoidBlobStore();
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -171,7 +181,9 @@ public class BlobServerDeleteTest extends TestLogger {
                File blobFile = null;
                File directory = null;
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, blobStore);
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -222,7 +234,9 @@ public class BlobServerDeleteTest extends TestLogger {
         */
        @Test
        public void testConcurrentDeleteOperations() throws IOException, 
ExecutionException, InterruptedException {
-               final Configuration configuration = new Configuration();
+               final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                final BlobStore blobStore = mock(BlobStore.class);
 
                final int concurrentDeleteOperations = 3;
@@ -232,7 +246,7 @@ public class BlobServerDeleteTest extends TestLogger {
 
                final byte[] data = {1, 2, 3};
 
-               try (final BlobServer blobServer = new 
BlobServer(configuration, blobStore)) {
+               try (final BlobServer blobServer = new BlobServer(config, 
blobStore)) {
 
                        final BlobKey blobKey;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 73827bc..bd27d70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -74,7 +74,9 @@ public class BlobServerGetTest extends TestLogger {
                BlobClient client = null;
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -114,7 +116,9 @@ public class BlobServerGetTest extends TestLogger {
                BlobClient client = null;
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -130,9 +134,10 @@ public class BlobServerGetTest extends TestLogger {
                        // issue a GET request that succeeds
                        InputStream is = client.get(key);
 
-                       byte[] receiveBuffer = new byte[50000];
-                       BlobUtils.readFully(is, receiveBuffer, 0, 
receiveBuffer.length, null);
-                       BlobUtils.readFully(is, receiveBuffer, 0, 
receiveBuffer.length, null);
+                       byte[] receiveBuffer = new byte[data.length];
+                       int firstChunkLen = 50000;
+                       BlobUtils.readFully(is, receiveBuffer, 0, 
firstChunkLen, null);
+                       BlobUtils.readFully(is, receiveBuffer, firstChunkLen, 
firstChunkLen, null);
 
                        // shut down the server
                        for (BlobServerConnection conn : 
server.getCurrentActiveConnections()) {
@@ -140,10 +145,10 @@ public class BlobServerGetTest extends TestLogger {
                        }
 
                        try {
-                               byte[] remainder = new byte[data.length - 
2*receiveBuffer.length];
-                               BlobUtils.readFully(is, remainder, 0, 
remainder.length, null);
+                               BlobUtils.readFully(is, receiveBuffer, 2 * 
firstChunkLen, data.length - 2 * firstChunkLen, null);
                                // we tolerate that this succeeds, as the 
receiver socket may have buffered
-                               // everything already
+                               // everything already, but in this case, also 
verify the contents
+                               assertArrayEquals(data, receiveBuffer);
                        }
                        catch (IOException e) {
                                // expected
@@ -165,9 +170,9 @@ public class BlobServerGetTest extends TestLogger {
         */
        @Test
        public void testConcurrentGetOperations() throws IOException, 
ExecutionException, InterruptedException {
-               final Configuration configuration = new Configuration();
 
-               configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+               final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
 
                final BlobStore blobStore = mock(BlobStore.class);
 
@@ -197,7 +202,7 @@ public class BlobServerGetTest extends TestLogger {
 
                final ExecutorService executor = 
Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
-               try (final BlobServer blobServer = new 
BlobServer(configuration, blobStore)) {
+               try (final BlobServer blobServer = new BlobServer(config, 
blobStore)) {
                        for (int i = 0; i < numberConcurrentGetOperations; i++) 
{
                                CompletableFuture<InputStream> getOperation = 
CompletableFuture.supplyAsync(
                                        () -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 80c6822..c479167 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
@@ -25,7 +26,9 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
@@ -62,6 +65,8 @@ public class BlobServerPutTest extends TestLogger {
 
        private final Random rnd = new Random();
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
        // --- concurrency tests for utility methods which could fail during 
the put operation ---
 
@@ -88,7 +93,10 @@ public class BlobServerPutTest extends TestLogger {
         */
        @Test
        public void testServerContentAddressableGetStorageLocationConcurrent() 
throws Exception {
-               BlobServer server = new BlobServer(new Configuration(), new 
VoidBlobStore());
+               final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+               BlobServer server = new BlobServer(config, new VoidBlobStore());
 
                try {
                        BlobKey key = new BlobKey();
@@ -131,7 +139,9 @@ public class BlobServerPutTest extends TestLogger {
                BlobClient client = null;
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -185,7 +195,9 @@ public class BlobServerPutTest extends TestLogger {
                BlobClient client = null;
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -219,7 +231,9 @@ public class BlobServerPutTest extends TestLogger {
                BlobClient client = null;
 
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
@@ -252,7 +266,9 @@ public class BlobServerPutTest extends TestLogger {
 
                File tempFileDir = null;
                try {
-                       Configuration config = new Configuration();
+                       final Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
 
                        // make sure the blob server cannot create any files in 
its storage dir
@@ -305,7 +321,9 @@ public class BlobServerPutTest extends TestLogger {
         */
        @Test
        public void testConcurrentPutOperations() throws IOException, 
ExecutionException, InterruptedException {
-               final Configuration configuration = new Configuration();
+               final Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                BlobStore blobStore = mock(BlobStore.class);
                int concurrentPutOperations = 2;
                int dataSize = 1024;
@@ -318,7 +336,7 @@ public class BlobServerPutTest extends TestLogger {
                ExecutorService executor = 
Executors.newFixedThreadPool(concurrentPutOperations);
 
                try (
-                       final BlobServer blobServer = new 
BlobServer(configuration, blobStore)) {
+                       final BlobServer blobServer = new BlobServer(config, 
blobStore)) {
 
                        for (int i = 0; i < concurrentPutOperations; i++) {
                                CompletableFuture<BlobKey> putFuture = 
CompletableFuture.supplyAsync(

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
index 120d86a..834d367 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java
@@ -23,7 +23,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.net.ServerSocket;
@@ -32,6 +34,10 @@ import java.net.ServerSocket;
  * Tests to ensure that the BlobServer properly starts on a specified range of 
available ports.
  */
 public class BlobServerRangeTest extends TestLogger {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        /**
         * Start blob server on 0 = pick an ephemeral port
         */
@@ -39,6 +45,8 @@ public class BlobServerRangeTest extends TestLogger {
        public void testOnEphemeralPort() throws IOException {
                Configuration conf = new Configuration();
                conf.setString(BlobServerOptions.PORT, "0");
+               conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
                BlobServer srv = new BlobServer(conf, new VoidBlobStore());
                srv.close();
        }
@@ -60,6 +68,7 @@ public class BlobServerRangeTest extends TestLogger {
 
                Configuration conf = new Configuration();
                conf.setString(BlobServerOptions.PORT, 
String.valueOf(socket.getLocalPort()));
+               conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
 
                // this thing is going to throw an exception
                try {
@@ -89,6 +98,7 @@ public class BlobServerRangeTest extends TestLogger {
                int availablePort = NetUtils.getAvailablePort();
                Configuration conf = new Configuration();
                conf.setString(BlobServerOptions.PORT, 
sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + 
availablePort);
+               conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
 
                // this thing is going to throw an exception
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index e9705cf..2987c39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -25,12 +25,12 @@ import static org.mockito.Mockito.mock;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Test;
 import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import org.junit.rules.TemporaryFolder;
 
 public class BlobUtilsTest {
 
@@ -54,7 +54,7 @@ public class BlobUtilsTest {
 
        @After
        public void after() {
-               // Cleanup test directory
+               // Cleanup test directory, ensure it was empty
                assertTrue(blobUtilsTestDirectory.delete());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index a727294..9d2bd55 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.execution.librarycache;
 
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -27,7 +28,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.OperatingSystem;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
@@ -43,6 +46,9 @@ import java.util.List;
 
 public class BlobLibraryCacheManagerTest {
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        /**
         * Tests that the {@link BlobLibraryCacheManager} cleans up after 
calling {@link
         * BlobLibraryCacheManager#unregisterJob(JobID)}.
@@ -59,6 +65,9 @@ public class BlobLibraryCacheManagerTest {
 
                try {
                        Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
                        InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
                        BlobClient bc = new BlobClient(blobSocketAddress, 
config);
@@ -179,6 +188,9 @@ public class BlobLibraryCacheManagerTest {
 
                try {
                        Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
                        InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
                        BlobClient bc = new BlobClient(blobSocketAddress, 
config);
@@ -249,6 +261,9 @@ public class BlobLibraryCacheManagerTest {
                try {
                        // create the blob transfer services
                        Configuration config = new Configuration();
+                       config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                               temporaryFolder.newFolder().getAbsolutePath());
+
                        server = new BlobServer(config, new VoidBlobStore());
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
                        cache = new BlobCache(serverAddress, config, new 
VoidBlobStore());

http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index 16e3a05..02f121b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.execution.librarycache;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -70,7 +71,10 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
                Configuration config = new Configuration();
                config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
                config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
-               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.getRoot().getAbsolutePath());
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+               config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
+                       temporaryFolder.newFolder().getAbsolutePath());
 
                try {
                        blobStoreService = 
BlobUtils.createBlobStoreFromConfig(config);
@@ -153,7 +157,8 @@ public class BlobLibraryCacheRecoveryITCase extends 
TestLogger {
 
                        // Verify everything is clean below 
recoveryDir/<cluster_id>
                        final String clusterId = 
config.getString(HighAvailabilityOptions.HA_CLUSTER_ID);
-                       File haBlobStoreDir = new 
File(temporaryFolder.getRoot(), clusterId);
+                       String haBlobStorePath = 
config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
+                       File haBlobStoreDir = new File(haBlobStorePath, 
clusterId);
                        File[] recoveryFiles = haBlobStoreDir.listFiles();
                        assertNotNull("HA storage directory does not exist", 
recoveryFiles);
                        assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);

Reply via email to