Repository: flink Updated Branches: refs/heads/master 305098614 -> c26c2e7b5
[FLINK-7053][blob] Improve code quality in BlobServer related tests This lets BlobClientSslTest extend BlobClientTest as most of its implementation came from there and was simply copied. [FLINK-7053][blob] verify some of the buffers returned by GET [FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests This replaces the use of some temporary directory where it is not guaranteed that it will be deleted after the test. This closes #4234. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c26c2e7b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c26c2e7b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c26c2e7b Branch: refs/heads/master Commit: c26c2e7b5c12fc2442446a6fb2d801eb87022c12 Parents: 3050986 Author: Nico Kruber <n...@data-artisans.com> Authored: Thu Jun 22 17:31:17 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Aug 7 12:01:24 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/hdfstests/HDFSTest.java | 8 + .../runtime/blob/BlobCacheRetriesTest.java | 11 +- .../runtime/blob/BlobCacheSuccessTest.java | 68 ++++- .../flink/runtime/blob/BlobClientSslTest.java | 256 +++++-------------- .../flink/runtime/blob/BlobClientTest.java | 80 ++++-- .../flink/runtime/blob/BlobRecoveryITCase.java | 4 +- .../runtime/blob/BlobServerDeleteTest.java | 24 +- .../flink/runtime/blob/BlobServerGetTest.java | 27 +- .../flink/runtime/blob/BlobServerPutTest.java | 32 ++- .../flink/runtime/blob/BlobServerRangeTest.java | 10 + .../flink/runtime/blob/BlobUtilsTest.java | 6 +- .../BlobLibraryCacheManagerTest.java | 15 ++ .../BlobLibraryCacheRecoveryITCase.java | 9 +- 13 files changed, 296 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index c490c9f..f70845b 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -48,7 +49,9 @@ import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -70,6 +73,9 @@ public class HDFSTest { private org.apache.hadoop.fs.Path hdPath; protected org.apache.hadoop.fs.FileSystem hdfs; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @BeforeClass public static void verifyOS() { Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); @@ -242,6 +248,8 @@ public class HDFSTest { config = new org.apache.flink.configuration.Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER"); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobStoreService blobStoreService = null; http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java index 1cf77ea..366b592 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.junit.Rule; @@ -45,6 +46,8 @@ public class BlobCacheRetriesTest { @Test public void testBlobFetchRetries() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); testBlobFetchRetries(config, new VoidBlobStore()); } @@ -56,9 +59,11 @@ public class BlobCacheRetriesTest { @Test public void testBlobFetchRetriesHa() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath()); + temporaryFolder.newFolder().getPath()); BlobStoreService blobStoreService = null; @@ -136,6 +141,8 @@ public class BlobCacheRetriesTest { @Test public void testBlobFetchWithTooManyFailures() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); testBlobFetchWithTooManyFailures(config, new VoidBlobStore()); } @@ -147,6 +154,8 @@ public class BlobCacheRetriesTest { @Test public void testBlobFetchWithTooManyFailuresHa() throws IOException { final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java index 2a65a3b..51be1b0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.util.Preconditions; + import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -52,6 +55,9 @@ public class BlobCacheSuccessTest { @Test public void testBlobCache() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + uploadFileGetTest(config, false, false); } @@ -63,27 +69,63 @@ public class BlobCacheSuccessTest { @Test public void testBlobCacheHa() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath()); + temporaryFolder.newFolder().getPath()); uploadFileGetTest(config, true, true); } /** + * BlobCache is configured in HA mode and the cache can download files from + * the file system directly and does not need to download BLOBs from the + * BlobServer. + */ + @Test + public void testBlobCacheHa2() throws IOException { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.newFolder().getPath()); + uploadFileGetTest(config, false, true); + } + + /** * BlobCache is configured in HA mode but the cache itself cannot access the * file system and thus needs to download BLOBs from the BlobServer. */ @Test public void testBlobCacheHaFallback() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath()); + temporaryFolder.newFolder().getPath()); uploadFileGetTest(config, false, false); } - private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer, - boolean cacheHasAccessToFs) throws IOException { + /** + * Uploads two different BLOBs to the {@link BlobServer} via a {@link BlobClient} and verifies + * we can access the files from a {@link BlobCache}. + * + * @param config + * configuration to use for the server and cache (the final cache's configuration will + * actually get some modifications) + * @param shutdownServerAfterUpload + * whether the server should be shut down after uploading the BLOBs (only useful with HA mode) + * - this implies that the cache has access to the shared <tt>HA_STORAGE_PATH</tt> + * @param cacheHasAccessToFs + * whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with + * HA mode) + */ + private void uploadFileGetTest(final Configuration config, boolean shutdownServerAfterUpload, + boolean cacheHasAccessToFs) throws IOException { + Preconditions.checkArgument(!shutdownServerAfterUpload || cacheHasAccessToFs); + // First create two BLOBs and upload them to BLOB server final byte[] buf = new byte[128]; final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2); @@ -92,15 +134,15 @@ public class BlobCacheSuccessTest { BlobCache blobCache = null; BlobStoreService blobStoreService = null; try { - final Configuration cacheConfig; - if (cacheHasAccessToFs) { - cacheConfig = config; - } else { - // just in case parameters are still read from the server, - // create a separate configuration object for the cache - cacheConfig = new Configuration(config); + final Configuration cacheConfig = new Configuration(config); + cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + if (!cacheHasAccessToFs) { + // make sure the cache cannot access the HA store directly + cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH, - temporaryFolder.getRoot().getPath() + "/does-not-exist"); + temporaryFolder.newFolder().getPath() + "/does-not-exist"); } blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig); @@ -124,7 +166,7 @@ public class BlobCacheSuccessTest { } } - if (cacheWorksWithoutServer) { + if (shutdownServerAfterUpload) { // Now, shut down the BLOB server, the BLOBs must still be accessible through the cache. blobServer.close(); blobServer = null; http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index a5189ea..790514c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -18,48 +18,36 @@ package org.apache.flink.runtime.blob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetSocketAddress; -import java.security.MessageDigest; -import java.util.Collections; -import java.util.List; - import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.SecurityOptions; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest extends TestLogger { - - /** The buffer size used during the tests in bytes. */ - private static final int TEST_BUFFER_SIZE = 17 * 1000; +public class BlobClientSslTest extends BlobClientTest { /** The instance of the SSL BLOB server used during the tests. */ private static BlobServer BLOB_SSL_SERVER; - /** The SSL blob service client configuration */ + /** Instance of a non-SSL BLOB server with SSL-enabled security options. */ + private static BlobServer BLOB_NON_SSL_SERVER; + + /** The SSL blob service client configuration. */ private static Configuration sslClientConfig; - /** The instance of the non-SSL BLOB server used during the tests. */ - private static BlobServer BLOB_SERVER; + /** The non-SSL blob service client configuration with SSL-enabled security options. */ + private static Configuration nonSslClientConfig; - /** The non-ssl blob service client configuration */ - private static Configuration clientConfig; + @ClassRule + public static TemporaryFolder temporarySslFolder = new TemporaryFolder(); /** * Starts the SSL enabled BLOB server. @@ -67,6 +55,8 @@ public class BlobClientSslTest extends TestLogger { @BeforeClass public static void startSSLServer() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); @@ -79,24 +69,23 @@ public class BlobClientSslTest extends TestLogger { sslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); } - /** - * Starts the SSL disabled BLOB server. - */ @BeforeClass public static void startNonSSLServer() throws IOException { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporarySslFolder.newFolder().getAbsolutePath()); config.setBoolean(SecurityOptions.SSL_ENABLED, true); config.setBoolean(BlobServerOptions.SSL_ENABLED, false); config.setString(SecurityOptions.SSL_KEYSTORE, "src/test/resources/local127.keystore"); config.setString(SecurityOptions.SSL_KEYSTORE_PASSWORD, "password"); config.setString(SecurityOptions.SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); + BLOB_NON_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); - clientConfig = new Configuration(); - clientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); - clientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); - clientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); + nonSslClientConfig = new Configuration(); + nonSslClientConfig.setBoolean(SecurityOptions.SSL_ENABLED, true); + nonSslClientConfig.setBoolean(BlobServerOptions.SSL_ENABLED, false); + nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE, "src/test/resources/local127.truststore"); + nonSslClientConfig.setString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD, "password"); } /** @@ -107,195 +96,92 @@ public class BlobClientSslTest extends TestLogger { if (BLOB_SSL_SERVER != null) { BLOB_SSL_SERVER.close(); } - - if (BLOB_SERVER != null) { - BLOB_SERVER.close(); + if (BLOB_NON_SSL_SERVER != null) { + BLOB_NON_SSL_SERVER.close(); } } - /** - * Prepares a test file for the unit tests, i.e. the methods fills the file with a particular byte patterns and - * computes the file's BLOB key. - * - * @param file - * the file to prepare for the unit tests - * @return the BLOB key of the prepared file - * @throws IOException - * thrown if an I/O error occurs while writing to the test file - */ - private static BlobKey prepareTestFile(File file) throws IOException { - - MessageDigest md = BlobUtils.createMessageDigest(); - - final byte[] buf = new byte[TEST_BUFFER_SIZE]; - for (int i = 0; i < buf.length; ++i) { - buf[i] = (byte) (i % 128); - } - - FileOutputStream fos = null; - try { - fos = new FileOutputStream(file); - - for (int i = 0; i < 20; ++i) { - fos.write(buf); - md.update(buf); - } - - } finally { - if (fos != null) { - fos.close(); - } - } + protected Configuration getBlobClientConfig() { + return sslClientConfig; + } - return new BlobKey(md.digest()); + protected BlobServer getBlobServer() { + return BLOB_SSL_SERVER; } /** - * Validates the result of a GET operation by comparing the data from the retrieved input stream to the content of - * the specified file. - * - * @param inputStream - * the input stream returned from the GET operation - * @param file - * the file to compare the input stream's data to - * @throws IOException - * thrown if an I/O error occurs while reading the input stream or the file + * Verify ssl client to ssl server upload */ - private static void validateGet(final InputStream inputStream, final File file) throws IOException { - - InputStream inputStream2 = null; - try { - - inputStream2 = new FileInputStream(file); - - while (true) { - - final int r1 = inputStream.read(); - final int r2 = inputStream2.read(); - - assertEquals(r2, r1); - - if (r1 < 0) { - break; - } - } - - } finally { - if (inputStream2 != null) { - inputStream2.close(); - } - } - + @Test + public void testUploadJarFilesHelper() throws Exception { + uploadJarFile(BLOB_SSL_SERVER, sslClientConfig); } /** - * Tests the PUT/GET operations for content-addressable streams. + * Verify ssl client to non-ssl server failure */ - @Test - public void testContentAddressableStream() { - - BlobClient client = null; - InputStream is = null; - - try { - File testFile = File.createTempFile("testfile", ".dat"); - testFile.deleteOnExit(); - - BlobKey origKey = prepareTestFile(testFile); - - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SSL_SERVER.getPort()); - client = new BlobClient(serverAddress, sslClientConfig); - - // Store the data - is = new FileInputStream(testFile); - BlobKey receivedKey = client.put(is); - assertEquals(origKey, receivedKey); - - is.close(); - is = null; - - // Retrieve the data - is = client.get(receivedKey); - validateGet(is, testFile); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { - if (is != null) { - try { - is.close(); - } catch (Throwable t) {} - } - if (client != null) { - try { - client.close(); - } catch (Throwable t) {} - } - } + @Test(expected = IOException.class) + public void testSSLClientFailure() throws Exception { + // SSL client connected to non-ssl server + uploadJarFile(BLOB_SERVER, sslClientConfig); } /** - * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + * Verify ssl client to non-ssl server failure */ - private void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { - final File testFile = File.createTempFile("testfile", ".dat"); - testFile.deleteOnExit(); - prepareTestFile(testFile); - - InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); - - List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig, - Collections.singletonList(new Path(testFile.toURI()))); + @Test(expected = IOException.class) + public void testSSLClientFailure2() throws Exception { + // SSL client connected to non-ssl server + uploadJarFile(BLOB_NON_SSL_SERVER, sslClientConfig); + } - assertEquals(1, blobKeys.size()); + /** + * Verify non-ssl client to ssl server failure + */ + @Test(expected = IOException.class) + public void testSSLServerFailure() throws Exception { + // Non-SSL client connected to ssl server + uploadJarFile(BLOB_SSL_SERVER, clientConfig); + } - try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { - InputStream is = blobClient.get(blobKeys.get(0)); - validateGet(is, testFile); - } + /** + * Verify non-ssl client to ssl server failure + */ + @Test(expected = IOException.class) + public void testSSLServerFailure2() throws Exception { + // Non-SSL client connected to ssl server + uploadJarFile(BLOB_SSL_SERVER, nonSslClientConfig); } /** - * Verify ssl client to ssl server upload + * Verify non-ssl connection sanity */ @Test - public void testUploadJarFilesHelper() throws Exception { - uploadJarFile(BLOB_SSL_SERVER, sslClientConfig); + public void testNonSSLConnection() throws Exception { + uploadJarFile(BLOB_SERVER, clientConfig); } /** - * Verify ssl client to non-ssl server failure + * Verify non-ssl connection sanity */ @Test - public void testSSLClientFailure() throws Exception { - try { - uploadJarFile(BLOB_SERVER, sslClientConfig); - fail("SSL client connected to non-ssl server"); - } catch (Exception e) { - // Exception expected - } + public void testNonSSLConnection2() throws Exception { + uploadJarFile(BLOB_SERVER, nonSslClientConfig); } /** - * Verify non-ssl client to ssl server failure + * Verify non-ssl connection sanity */ @Test - public void testSSLServerFailure() throws Exception { - try { - uploadJarFile(BLOB_SSL_SERVER, clientConfig); - fail("Non-SSL client connected to ssl server"); - } catch (Exception e) { - // Exception expected - } + public void testNonSSLConnection3() throws Exception { + uploadJarFile(BLOB_NON_SSL_SERVER, clientConfig); } /** * Verify non-ssl connection sanity */ @Test - public void testNonSSLConnection() throws Exception { - uploadJarFile(BLOB_SERVER, clientConfig); + public void testNonSSLConnection4() throws Exception { + uploadJarFile(BLOB_NON_SSL_SERVER, nonSslClientConfig); } } http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 0a8f738..2932f41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -18,8 +18,14 @@ package org.apache.flink.runtime.blob; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.EOFException; import java.io.File; @@ -32,12 +38,9 @@ import java.security.MessageDigest; import java.util.Collections; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.common.JobID; -import org.apache.flink.core.fs.Path; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * This class contains unit tests for the {@link BlobClient}. @@ -47,19 +50,27 @@ public class BlobClientTest { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; - /** The instance of the BLOB server used during the tests. */ - private static BlobServer BLOB_SERVER; + /** The instance of the (non-ssl) BLOB server used during the tests. */ + static BlobServer BLOB_SERVER; + + /** The blob service (non-ssl) client configuration */ + static Configuration clientConfig; - /** The blob service client and server configuration */ - private static Configuration blobServiceConfig; + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); /** * Starts the BLOB server. */ @BeforeClass public static void startServer() throws IOException { - blobServiceConfig = new Configuration(); - BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore()); + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); + + clientConfig = new Configuration(); } /** @@ -134,19 +145,21 @@ public class BlobClientTest { * thrown if an I/O error occurs while reading the input stream */ private static void validateGet(final InputStream inputStream, final byte[] buf) throws IOException { + byte[] receivedBuffer = new byte[buf.length]; int bytesReceived = 0; while (true) { - final int read = inputStream.read(buf, bytesReceived, buf.length - bytesReceived); + final int read = inputStream.read(receivedBuffer, bytesReceived, receivedBuffer.length - bytesReceived); if (read < 0) { throw new EOFException(); } bytesReceived += read; - if (bytesReceived == buf.length) { + if (bytesReceived == receivedBuffer.length) { assertEquals(-1, inputStream.read()); + assertArrayEquals(buf, receivedBuffer); return; } } @@ -204,8 +217,8 @@ public class BlobClientTest { md.update(testBuffer); BlobKey origKey = new BlobKey(md.digest()); - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress, blobServiceConfig); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + client = new BlobClient(serverAddress, getBlobClientConfig()); // Store the data BlobKey receivedKey = client.put(testBuffer); @@ -232,11 +245,19 @@ public class BlobClientTest { if (client != null) { try { client.close(); - } catch (Throwable t) {} + } catch (Throwable ignored) {} } } } + protected Configuration getBlobClientConfig() { + return clientConfig; + } + + protected BlobServer getBlobServer() { + return BLOB_SERVER; + } + /** * Tests the PUT/GET operations for content-addressable streams. */ @@ -252,8 +273,8 @@ public class BlobClientTest { BlobKey origKey = prepareTestFile(testFile); - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); - client = new BlobClient(serverAddress, blobServiceConfig); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", getBlobServer().getPort()); + client = new BlobClient(serverAddress, getBlobClientConfig()); // Store the data is = new FileInputStream(testFile); @@ -275,12 +296,12 @@ public class BlobClientTest { if (is != null) { try { is.close(); - } catch (Throwable t) {} + } catch (Throwable ignored) {} } if (client != null) { try { client.close(); - } catch (Throwable t) {} + } catch (Throwable ignored) {} } } } @@ -290,18 +311,25 @@ public class BlobClientTest { */ @Test public void testUploadJarFilesHelper() throws Exception { + uploadJarFile(getBlobServer(), getBlobClientConfig()); + } + + /** + * Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, Configuration, List)} helper. + */ + static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig) throws Exception { final File testFile = File.createTempFile("testfile", ".dat"); testFile.deleteOnExit(); prepareTestFile(testFile); - InetSocketAddress serverAddress = new InetSocketAddress("localhost", BLOB_SERVER.getPort()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort()); - List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobServiceConfig, + List<BlobKey> blobKeys = BlobClient.uploadJarFiles(serverAddress, blobClientConfig, Collections.singletonList(new Path(testFile.toURI()))); assertEquals(1, blobKeys.size()); - try (BlobClient blobClient = new BlobClient(serverAddress, blobServiceConfig)) { + try (BlobClient blobClient = new BlobClient(serverAddress, blobClientConfig)) { InputStream is = blobClient.get(blobKeys.get(0)); validateGet(is, testFile); } http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index c2a3a7a..3c7711d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -54,7 +55,8 @@ public class BlobRecoveryITCase extends TestLogger { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getPath()); BlobStoreService blobStoreService = null; http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 22271af..5db9568 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -18,12 +18,15 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.concurrent.FlinkFutureException; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; @@ -51,6 +54,9 @@ public class BlobServerDeleteTest extends TestLogger { private final Random rnd = new Random(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Test public void testDeleteSingleByBlobKey() { BlobServer server = null; @@ -58,7 +64,9 @@ public class BlobServerDeleteTest extends TestLogger { BlobStore blobStore = new VoidBlobStore(); try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -124,7 +132,9 @@ public class BlobServerDeleteTest extends TestLogger { BlobStore blobStore = new VoidBlobStore(); try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -171,7 +181,9 @@ public class BlobServerDeleteTest extends TestLogger { File blobFile = null; File directory = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -222,7 +234,9 @@ public class BlobServerDeleteTest extends TestLogger { */ @Test public void testConcurrentDeleteOperations() throws IOException, ExecutionException, InterruptedException { - final Configuration configuration = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + final BlobStore blobStore = mock(BlobStore.class); final int concurrentDeleteOperations = 3; @@ -232,7 +246,7 @@ public class BlobServerDeleteTest extends TestLogger { final byte[] data = {1, 2, 3}; - try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + try (final BlobServer blobServer = new BlobServer(config, blobStore)) { final BlobKey blobKey; http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 73827bc..bd27d70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -74,7 +74,9 @@ public class BlobServerGetTest extends TestLogger { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -114,7 +116,9 @@ public class BlobServerGetTest extends TestLogger { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -130,9 +134,10 @@ public class BlobServerGetTest extends TestLogger { // issue a GET request that succeeds InputStream is = client.get(key); - byte[] receiveBuffer = new byte[50000]; - BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); - BlobUtils.readFully(is, receiveBuffer, 0, receiveBuffer.length, null); + byte[] receiveBuffer = new byte[data.length]; + int firstChunkLen = 50000; + BlobUtils.readFully(is, receiveBuffer, 0, firstChunkLen, null); + BlobUtils.readFully(is, receiveBuffer, firstChunkLen, firstChunkLen, null); // shut down the server for (BlobServerConnection conn : server.getCurrentActiveConnections()) { @@ -140,10 +145,10 @@ public class BlobServerGetTest extends TestLogger { } try { - byte[] remainder = new byte[data.length - 2*receiveBuffer.length]; - BlobUtils.readFully(is, remainder, 0, remainder.length, null); + BlobUtils.readFully(is, receiveBuffer, 2 * firstChunkLen, data.length - 2 * firstChunkLen, null); // we tolerate that this succeeds, as the receiver socket may have buffered - // everything already + // everything already, but in this case, also verify the contents + assertArrayEquals(data, receiveBuffer); } catch (IOException e) { // expected @@ -165,9 +170,9 @@ public class BlobServerGetTest extends TestLogger { */ @Test public void testConcurrentGetOperations() throws IOException, ExecutionException, InterruptedException { - final Configuration configuration = new Configuration(); - configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); final BlobStore blobStore = mock(BlobStore.class); @@ -197,7 +202,7 @@ public class BlobServerGetTest extends TestLogger { final ExecutorService executor = Executors.newFixedThreadPool(numberConcurrentGetOperations); - try (final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + try (final BlobServer blobServer = new BlobServer(config, blobStore)) { for (int i = 0; i < numberConcurrentGetOperations; i++) { CompletableFuture<InputStream> getOperation = CompletableFuture.supplyAsync( () -> { http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index 80c6822..c479167 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.blob; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.concurrent.FlinkFutureException; @@ -25,7 +26,9 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.ByteArrayInputStream; import java.io.File; @@ -62,6 +65,8 @@ public class BlobServerPutTest extends TestLogger { private final Random rnd = new Random(); + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); // --- concurrency tests for utility methods which could fail during the put operation --- @@ -88,7 +93,10 @@ public class BlobServerPutTest extends TestLogger { */ @Test public void testServerContentAddressableGetStorageLocationConcurrent() throws Exception { - BlobServer server = new BlobServer(new Configuration(), new VoidBlobStore()); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + + BlobServer server = new BlobServer(config, new VoidBlobStore()); try { BlobKey key = new BlobKey(); @@ -131,7 +139,9 @@ public class BlobServerPutTest extends TestLogger { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -185,7 +195,9 @@ public class BlobServerPutTest extends TestLogger { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -219,7 +231,9 @@ public class BlobServerPutTest extends TestLogger { BlobClient client = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); @@ -252,7 +266,9 @@ public class BlobServerPutTest extends TestLogger { File tempFileDir = null; try { - Configuration config = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir @@ -305,7 +321,9 @@ public class BlobServerPutTest extends TestLogger { */ @Test public void testConcurrentPutOperations() throws IOException, ExecutionException, InterruptedException { - final Configuration configuration = new Configuration(); + final Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobStore blobStore = mock(BlobStore.class); int concurrentPutOperations = 2; int dataSize = 1024; @@ -318,7 +336,7 @@ public class BlobServerPutTest extends TestLogger { ExecutorService executor = Executors.newFixedThreadPool(concurrentPutOperations); try ( - final BlobServer blobServer = new BlobServer(configuration, blobStore)) { + final BlobServer blobServer = new BlobServer(config, blobStore)) { for (int i = 0; i < concurrentPutOperations; i++) { CompletableFuture<BlobKey> putFuture = CompletableFuture.supplyAsync( http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index 120d86a..834d367 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -23,7 +23,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.net.ServerSocket; @@ -32,6 +34,10 @@ import java.net.ServerSocket; * Tests to ensure that the BlobServer properly starts on a specified range of available ports. */ public class BlobServerRangeTest extends TestLogger { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Start blob server on 0 = pick an ephemeral port */ @@ -39,6 +45,8 @@ public class BlobServerRangeTest extends TestLogger { public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, "0"); + conf.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); srv.close(); } @@ -60,6 +68,7 @@ public class BlobServerRangeTest extends TestLogger { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, String.valueOf(socket.getLocalPort())); + conf.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); // this thing is going to throw an exception try { @@ -89,6 +98,7 @@ public class BlobServerRangeTest extends TestLogger { int availablePort = NetUtils.getAvailablePort(); Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, sockets[0].getLocalPort() + "," + sockets[1].getLocalPort() + "," + availablePort); + conf.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); // this thing is going to throw an exception try { http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java index e9705cf..2987c39 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java @@ -25,12 +25,12 @@ import static org.mockito.Mockito.mock; import org.apache.flink.util.OperatingSystem; import org.junit.After; import org.junit.Before; -import org.junit.Test; import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; -import org.junit.rules.TemporaryFolder; public class BlobUtilsTest { @@ -54,7 +54,7 @@ public class BlobUtilsTest { @After public void after() { - // Cleanup test directory + // Cleanup test directory, ensure it was empty assertTrue(blobUtilsTestDirectory.delete()); } http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index a727294..9d2bd55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.execution.librarycache; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; @@ -27,7 +28,9 @@ import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import static org.junit.Assert.*; import static org.junit.Assume.assumeTrue; @@ -43,6 +46,9 @@ import java.util.List; public class BlobLibraryCacheManagerTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Tests that the {@link BlobLibraryCacheManager} cleans up after calling {@link * BlobLibraryCacheManager#unregisterJob(JobID)}. @@ -59,6 +65,9 @@ public class BlobLibraryCacheManagerTest { try { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -179,6 +188,9 @@ public class BlobLibraryCacheManagerTest { try { Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -249,6 +261,9 @@ public class BlobLibraryCacheManagerTest { try { // create the blob transfer services Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); cache = new BlobCache(serverAddress, config, new VoidBlobStore()); http://git-wip-us.apache.org/repos/asf/flink/blob/c26c2e7b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 16e3a05..02f121b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.execution.librarycache; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -70,7 +71,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, + temporaryFolder.newFolder().getAbsolutePath()); try { blobStoreService = BlobUtils.createBlobStoreFromConfig(config); @@ -153,7 +157,8 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { // Verify everything is clean below recoveryDir/<cluster_id> final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); - File haBlobStoreDir = new File(temporaryFolder.getRoot(), clusterId); + String haBlobStorePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH); + File haBlobStoreDir = new File(haBlobStorePath, clusterId); File[] recoveryFiles = haBlobStoreDir.listFiles(); assertNotNull("HA storage directory does not exist", recoveryFiles); assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);