http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java index 27603d0..f9052e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientSslTest.java @@ -36,6 +36,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -43,7 +44,7 @@ import org.junit.Test; /** * This class contains unit tests for the {@link BlobClient} with ssl enabled. */ -public class BlobClientSslTest { +public class BlobClientSslTest extends TestLogger { /** The buffer size used during the tests in bytes. */ private static final int TEST_BUFFER_SIZE = 17 * 1000; @@ -64,19 +65,14 @@ public class BlobClientSslTest { * Starts the SSL enabled BLOB server. */ @BeforeClass - public static void startSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SSL_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SSL_SERVER = new BlobServer(config, new VoidBlobStore()); + sslClientConfig = new Configuration(); sslClientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -88,20 +84,14 @@ public class BlobClientSslTest { * Starts the SSL disabled BLOB server. */ @BeforeClass - public static void startNonSSLServer() { - try { - Configuration config = new Configuration(); - config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); - config.setBoolean(BlobServerOptions.SSL_ENABLED, false); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); - config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); - config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); - BLOB_SERVER = new BlobServer(config); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startNonSSLServer() throws IOException { + Configuration config = new Configuration(); + config.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); + config.setBoolean(BlobServerOptions.SSL_ENABLED, false); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE, "src/test/resources/local127.keystore"); + config.setString(ConfigConstants.SECURITY_SSL_KEYSTORE_PASSWORD, "password"); + config.setString(ConfigConstants.SECURITY_SSL_KEY_PASSWORD, "password"); + BLOB_SERVER = new BlobServer(config, new VoidBlobStore()); clientConfig = new Configuration(); clientConfig.setBoolean(ConfigConstants.SECURITY_SSL_ENABLED, true); @@ -114,13 +104,13 @@ public class BlobClientSslTest { * Shuts the BLOB server down. */ @AfterClass - public static void stopServers() { + public static void stopServers() throws IOException { if (BLOB_SSL_SERVER != null) { - BLOB_SSL_SERVER.shutdown(); + BLOB_SSL_SERVER.close(); } if (BLOB_SERVER != null) { - BLOB_SERVER.shutdown(); + BLOB_SERVER.close(); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 8f8f8c5..fda4ee9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -57,24 +57,18 @@ public class BlobClientTest { * Starts the BLOB server. */ @BeforeClass - public static void startServer() { - try { - blobServiceConfig = new Configuration(); - BLOB_SERVER = new BlobServer(blobServiceConfig); - } - catch (IOException e) { - e.printStackTrace(); - fail(e.getMessage()); - } + public static void startServer() throws IOException { + blobServiceConfig = new Configuration(); + BLOB_SERVER = new BlobServer(blobServiceConfig, new VoidBlobStore()); } /** * Shuts the BLOB server down. */ @AfterClass - public static void stopServer() { + public static void stopServer() throws IOException { if (BLOB_SERVER != null) { - BLOB_SERVER.shutdown(); + BLOB_SERVER.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index f8d50d5..4f12ddb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -30,16 +30,13 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Random; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -59,10 +56,20 @@ public class BlobRecoveryITCase extends TestLogger { config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM"); config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath()); - testBlobServerRecovery(config); + BlobStoreService blobStoreService = null; + + try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + + testBlobServerRecovery(config, blobStoreService); + } finally { + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } + } } - public static void testBlobServerRecovery(final Configuration config) throws IOException { + public static void testBlobServerRecovery(final Configuration config, final BlobStore blobStore) throws IOException { final String clusterId = config.getString(HighAvailabilityOptions.HA_CLUSTER_ID); String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/" + clusterId; Random rand = new Random(); @@ -73,7 +80,7 @@ public class BlobRecoveryITCase extends TestLogger { try { for (int i = 0; i < server.length; i++) { - server[i] = new BlobServer(config); + server[i] = new BlobServer(config, blobStore); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); } @@ -166,7 +173,7 @@ public class BlobRecoveryITCase extends TestLogger { finally { for (BlobServer s : server) { if (s != null) { - s.shutdown(); + s.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index 025a2ff..e8e28a1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -44,10 +44,11 @@ public class BlobServerDeleteTest { public void testDeleteSingle() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -93,10 +94,11 @@ public class BlobServerDeleteTest { public void testDeleteAll() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -156,10 +158,11 @@ public class BlobServerDeleteTest { public void testDeleteAlreadyDeletedByBlobKey() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -195,10 +198,11 @@ public class BlobServerDeleteTest { public void testDeleteAlreadyDeletedByName() { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -237,10 +241,11 @@ public class BlobServerDeleteTest { BlobServer server = null; BlobClient client = null; + BlobStore blobStore = new VoidBlobStore(); try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, blobStore); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -289,7 +294,11 @@ public class BlobServerDeleteTest { } } if (server != null) { - server.shutdown(); + try { + server.close(); + } catch (IOException e) { + e.printStackTrace(); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java index 59a62e1..6d1dba8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java @@ -40,13 +40,13 @@ public class BlobServerGetTest { private final Random rnd = new Random(); @Test - public void testGetFailsDuringLookup() { + public void testGetFailsDuringLookup() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -66,37 +66,27 @@ public class BlobServerGetTest { try { client.get(key); fail("This should not succeed."); - } - catch (IOException e) { + } catch (IOException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testGetFailsDuringStreaming() { + public void testGetFailsDuringStreaming() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -129,21 +119,12 @@ public class BlobServerGetTest { catch (IOException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index c4d6d1c..441ca7d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -42,13 +42,13 @@ public class BlobServerPutTest { private final Random rnd = new Random(); @Test - public void testPutBufferSuccessful() { + public void testPutBufferSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -95,34 +95,25 @@ public class BlobServerPutTest { BlobUtils.readFully(is3, result3, 0, result3.length, null); is3.close(); assertArrayEquals(data, result3); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutStreamSuccessful() { + public void testPutStreamSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -143,12 +134,7 @@ public class BlobServerPutTest { String stringKey = "my test key"; client.put(jid, stringKey, new ByteArrayInputStream(data)); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { try { client.close(); @@ -157,19 +143,19 @@ public class BlobServerPutTest { } } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutChunkedStreamSuccessful() { + public void testPutChunkedStreamSuccessful() throws IOException { BlobServer server = null; BlobClient client = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); client = new BlobClient(serverAddress, config); @@ -190,27 +176,18 @@ public class BlobServerPutTest { String stringKey = "my test key"; client.put(jid, stringKey, new ChunkedInputStream(data, 17)); } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutBufferFails() { + public void testPutBufferFails() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -219,7 +196,7 @@ public class BlobServerPutTest { File tempFileDir = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); @@ -250,31 +227,22 @@ public class BlobServerPutTest { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { // set writable again to make sure we can remove the directory if (tempFileDir != null) { tempFileDir.setWritable(true, false); } if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } @Test - public void testPutNamedBufferFails() { + public void testPutNamedBufferFails() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -283,7 +251,7 @@ public class BlobServerPutTest { File tempFileDir = null; try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); // make sure the blob server cannot create any files in its storage dir tempFileDir = server.createTemporaryFilename().getParentFile().getParentFile(); @@ -317,25 +285,16 @@ public class BlobServerPutTest { catch (IllegalStateException e) { // expected } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { // set writable again to make sure we can remove the directory if (tempFileDir != null) { tempFileDir.setWritable(true, false); } if (client != null) { - try { - client.close(); - } catch (Throwable t) { - t.printStackTrace(); - } + client.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java index c3762aa..120d86a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRangeTest.java @@ -39,8 +39,8 @@ public class BlobServerRangeTest extends TestLogger { public void testOnEphemeralPort() throws IOException { Configuration conf = new Configuration(); conf.setString(BlobServerOptions.PORT, "0"); - BlobServer srv = new BlobServer(conf); - srv.shutdown(); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); + srv.close(); } /** @@ -63,7 +63,7 @@ public class BlobServerRangeTest extends TestLogger { // this thing is going to throw an exception try { - BlobServer srv = new BlobServer(conf); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); } finally { socket.close(); } @@ -92,9 +92,9 @@ public class BlobServerRangeTest extends TestLogger { // this thing is going to throw an exception try { - BlobServer srv = new BlobServer(conf); + BlobServer srv = new BlobServer(conf, new VoidBlobStore()); Assert.assertEquals(availablePort, srv.getPort()); - srv.shutdown(); + srv.close(); } finally { sockets[0].close(); sockets[1].close(); http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java index 93f9b73..91e119b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java @@ -28,8 +28,8 @@ public class TestingFailingBlobServer extends BlobServer { private int numFailures; - public TestingFailingBlobServer(Configuration config, int numFailures) throws IOException { - super(config); + public TestingFailingBlobServer(Configuration config, BlobStore blobStore, int numFailures) throws IOException { + super(config, blobStore); this.numFailures = numFailures; } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 5d9ade3..98e6b3e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -18,13 +18,12 @@ package org.apache.flink.runtime.execution.librarycache; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.api.common.JobID; import org.apache.flink.util.OperatingSystem; @@ -45,7 +44,7 @@ import java.util.List; public class BlobLibraryCacheManagerTest { @Test - public void testLibraryCacheManagerCleanup() { + public void testLibraryCacheManagerCleanup() throws IOException, InterruptedException { JobID jid = new JobID(); List<BlobKey> keys = new ArrayList<BlobKey>(); @@ -56,7 +55,7 @@ public class BlobLibraryCacheManagerTest { try { Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getPort()); BlobClient bc = new BlobClient(blobSocketAddress, config); @@ -108,14 +107,9 @@ public class BlobLibraryCacheManagerTest { assertEquals(2, caughtExceptions); bc.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (server != null) { - server.shutdown(); + server.close(); } if (libraryCacheManager != null) { @@ -130,7 +124,7 @@ public class BlobLibraryCacheManagerTest { } @Test - public void testRegisterAndDownload() { + public void testRegisterAndDownload() throws IOException { assumeTrue(!OperatingSystem.isWindows()); //setWritable doesn't work on Windows. BlobServer server = null; @@ -139,9 +133,9 @@ public class BlobLibraryCacheManagerTest { try { // create the blob transfer services Configuration config = new Configuration(); - server = new BlobServer(config); + server = new BlobServer(config, new VoidBlobStore()); InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); - cache = new BlobCache(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); // upload some meaningless data to the server BlobClient uploader = new BlobClient(serverAddress, config); @@ -210,22 +204,17 @@ public class BlobLibraryCacheManagerTest { catch (IOException e) { // splendid! } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - finally { + } finally { if (cacheDir != null) { if (!cacheDir.setWritable(true, false)) { System.err.println("Could not re-add write permissions to cache directory."); } } if (cache != null) { - cache.shutdown(); + cache.close(); } if (server != null) { - server.shutdown(); + server.close(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 54e1a9b..16e3a05 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobStoreService; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.TestLogger; @@ -63,6 +65,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { BlobLibraryCacheManager[] libServer = new BlobLibraryCacheManager[2]; BlobCache cache = null; BlobLibraryCacheManager libCache = null; + BlobStoreService blobStoreService = null; Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); @@ -70,8 +73,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath()); try { + blobStoreService = BlobUtils.createBlobStoreFromConfig(config); + for (int i = 0; i < server.length; i++) { - server[i] = new BlobServer(config); + server[i] = new BlobServer(config, blobStoreService); serverAddress[i] = new InetSocketAddress("localhost", server[i].getPort()); libServer[i] = new BlobLibraryCacheManager(server[i], 3600 * 1000); } @@ -89,7 +94,7 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { } // The cache - cache = new BlobCache(serverAddress[0], config); + cache = new BlobCache(serverAddress[0], config, blobStoreService); libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Register uploaded libraries @@ -110,10 +115,10 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { } // Shutdown cache and start with other server - cache.shutdown(); + cache.close(); libCache.shutdown(); - cache = new BlobCache(serverAddress[1], config); + cache = new BlobCache(serverAddress[1], config, blobStoreService); libCache = new BlobLibraryCacheManager(cache, 3600 * 1000); // Verify key 1 @@ -156,17 +161,21 @@ public class BlobLibraryCacheRecoveryITCase extends TestLogger { finally { for (BlobServer s : server) { if (s != null) { - s.shutdown(); + s.close(); } } if (cache != null) { - cache.shutdown(); + cache.close(); } if (libCache != null) { libCache.shutdown(); } + + if (blobStoreService != null) { + blobStoreService.closeAndCleanupAllData(); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java index 06ffe3c..d89093d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRegistryTest.java @@ -22,11 +22,11 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus; -import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; @@ -62,7 +62,10 @@ public class ZooKeeperRegistryTest extends TestLogger { configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); final HighAvailabilityServices zkHaService = new ZooKeeperHaServices( - ZooKeeperUtils.startCuratorFramework(configuration), Executors.directExecutor(), configuration); + ZooKeeperUtils.startCuratorFramework(configuration), + Executors.directExecutor(), + configuration, + new VoidBlobStore()); final RunningJobsRegistry zkRegistry = zkHaService.getRunningJobsRegistry(); http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index b8b5984..a63b02d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -43,7 +43,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.SubtaskState; @@ -71,7 +70,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -102,7 +100,6 @@ import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -190,7 +187,11 @@ public class JobManagerHARecoveryTest extends TestLogger { TestingUtils.defaultExecutor(), instanceManager, scheduler, - new BlobLibraryCacheManager(new BlobServer(flinkConfiguration), 3600000), + new BlobLibraryCacheManager( + new BlobServer( + flinkConfiguration, + testingHighAvailabilityServices.createBlobStore()), + 3600000L), archive, new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100), timeout, http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index d6257ba..70800e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -31,6 +31,7 @@ import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; @@ -184,7 +185,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { TestingUtils.defaultExecutor(), new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), - new BlobLibraryCacheManager(new BlobServer(configuration), 10L), + new BlobLibraryCacheManager(new BlobServer(configuration, new VoidBlobStore()), 10L), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeoutAsFiniteDuration(), http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java index 0ea47f2..0282a4f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderRetrievalTest.java @@ -18,17 +18,21 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -64,10 +68,13 @@ public class ZooKeeperLeaderRetrievalTest extends TestLogger{ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); - highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, + CuratorFramework client = ZooKeeperUtils.startCuratorFramework(config); + + highAvailabilityServices = new ZooKeeperHaServices( + client, TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); + config, + new VoidBlobStore()); } @After http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index 58f2231..d6fc48c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -97,7 +97,7 @@ public class TaskManagerMetricsTest extends TestLogger { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + highAvailabilityServices, tmRegistry); final ActorRef taskManager = actorSystem.actorOf(tmProps); http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index 2a4c036..9dcfc70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -149,9 +149,6 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { network.start(); - LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( - HighAvailabilityServices.DEFAULT_JOB_ID); - MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config); // create the task manager @@ -164,7 +161,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger { ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, new MetricRegistry(metricRegistryConfiguration)); taskManager = actorSystem.actorOf(tmProps); http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index 92de31a..0844aad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -26,6 +26,7 @@ import akka.testkit.JavaTestKit; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.concurrent.Executors; @@ -57,6 +58,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.util.Arrays; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -601,7 +603,7 @@ public class TaskManagerRegistrationTest extends TestLogger { } @Test - public void testCheckForValidRegistrationSessionIDs() { + public void testCheckForValidRegistrationSessionIDs() throws IOException { new JavaTestKit(actorSystem) {{ ActorGateway taskManagerGateway = null; @@ -612,6 +614,7 @@ public class TaskManagerRegistrationTest extends TestLogger { HighAvailabilityServices mockedHighAvailabilityServices = mock(HighAvailabilityServices.class); when(mockedHighAvailabilityServices.getJobManagerLeaderRetriever(Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID))) .thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), trueLeaderSessionID)); + when(mockedHighAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore()); try { // we make the test actor (the test kit) the JobManager to intercept http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 7ba1633..98f136a 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console # ----------------------------------------------------------------------------- # Console (use 'console') http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index 4be3299..1b9ee48 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -249,7 +249,8 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with Befor val components = JobManager.createJobManagerComponents( config, executor, - executor) + executor, + highAvailabilityServices.createBlobStore()) // Start the JobManager without a MetricRegistry so that we don't start the MetricQueryService. // The problem of the MetricQueryService is that it starts an actor with a fixed name. Thus, http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 09dc5ed..1db0a85 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.runtime.testingUtils import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration @@ -32,15 +32,15 @@ import scala.language.postfixOps /** Subclass of the [[TaskManager]] to support testing messages */ class TestingTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) extends TaskManager( config, resourceID, @@ -49,19 +49,19 @@ class TestingTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) with TestingTaskManagerLike { def this( - config: TaskManagerConfiguration, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) { + config: TaskManagerConfiguration, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) { this( config, ResourceID.generate(), @@ -70,7 +70,7 @@ class TestingTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) } } http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java index 5f9d178..2983d66 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java @@ -155,6 +155,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger { Configuration config = new Configuration(); config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkQuorum); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, FileStateBackendBasePath.getAbsolutePath()); ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( "leader", 1, config); http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala index 0f82faa..1df4b8d 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala @@ -19,6 +19,7 @@ package org.apache.flink.yarn import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService @@ -40,19 +41,19 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike * @param ioManager IOManager responsible for I/O * @param network NetworkEnvironment for this actor * @param numberOfSlots Number of slots for this TaskManager - * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading - * JobManager + * @param highAvailabilityServices [[HighAvailabilityServices]] to create a leader retrieval + * service for retrieving the leading JobManager */ class TestingYarnTaskManager( - config: TaskManagerConfiguration, - resourceID: ResourceID, - connectionInfo: TaskManagerLocation, - memoryManager: MemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, - metricRegistry : MetricRegistry) + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + highAvailabilityServices: HighAvailabilityServices, + metricRegistry : MetricRegistry) extends YarnTaskManager( config, resourceID, @@ -61,7 +62,7 @@ class TestingYarnTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) with TestingTaskManagerLike { http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java index e9c3904..f81d040 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/highavailability/YarnHighAvailabilityServices.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.BlobStoreService; import org.apache.flink.runtime.blob.FileSystemBlobStore; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -91,6 +92,9 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe * HA services clean up */ protected final Path haDataDirectory; + /** Blob store service to be used for the BlobServer and BlobCache */ + protected final BlobStoreService blobStoreService; + /** Flag marking this instance as shut down */ private volatile boolean closed; @@ -153,6 +157,8 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe } LOG.info("Flink YARN application will store recovery data at {}", haDataDirectory); + + blobStoreService = new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); } // ------------------------------------------------------------------------ @@ -163,7 +169,7 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe public BlobStore createBlobStore() throws IOException { enter(); try { - return new FileSystemBlobStore(flinkFileSystem, haDataDirectory.toString()); + return blobStoreService; } finally { exit(); } @@ -192,11 +198,23 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe } closed = true; + Throwable exception = null; + + try { + blobStoreService.close(); + } catch (Throwable t) { + exception = t; + } + // we do not propagate exceptions here, but only log them try { hadoopFileSystem.close(); } catch (Throwable t) { - LOG.warn("Error closing Hadoop FileSystem", t); + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + + if (exception != null) { + ExceptionUtils.rethrowException(exception, "Could not properly close the YarnHighAvailabilityServices."); } } finally { @@ -213,12 +231,18 @@ public abstract class YarnHighAvailabilityServices implements HighAvailabilitySe // we remember exceptions only, then continue cleanup, and re-throw at the end Throwable exception = null; + try { + blobStoreService.closeAndCleanupAllData(); + } catch (Throwable t) { + exception = t; + } + // first, we delete all data in Flink's data directory try { flinkFileSystem.delete(haDataDirectory, true); } catch (Throwable t) { - exception = t; + exception = ExceptionUtils.firstOrSuppressed(t, exception); } // now we actually close the services http://git-wip-us.apache.org/repos/asf/flink/blob/88b0f2ac/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index be31085..b7f4c9a 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.yarn import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import org.apache.flink.runtime.metrics.MetricRegistry @@ -38,7 +38,7 @@ class YarnTaskManager( ioManager: IOManager, network: NetworkEnvironment, numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService, + highAvailabilityServices: HighAvailabilityServices, metricRegistry : MetricRegistry) extends TaskManager( config, @@ -48,7 +48,7 @@ class YarnTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService, + highAvailabilityServices, metricRegistry) { override def handleMessage: Receive = {