Repository: flink Updated Branches: refs/heads/master 90834ca70 -> 6151d707c
[FLINK-1492] [FLINK-1513] Fix BLOB service shutdown message and avoid global configuration Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6151d707 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6151d707 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6151d707 Branch: refs/heads/master Commit: 6151d707c6130a38d18501414447cfa88969d9b8 Parents: 69b7945 Author: Stephan Ewen <se...@apache.org> Authored: Wed Feb 11 19:08:21 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 20:42:43 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/Configuration.java | 2 +- .../apache/flink/runtime/blob/BlobCache.java | 53 +++++++++------ .../flink/runtime/blob/BlobInputStream.java | 8 --- .../apache/flink/runtime/blob/BlobServer.java | 70 ++++++++++++-------- .../apache/flink/runtime/blob/BlobService.java | 2 +- .../apache/flink/runtime/blob/BlobUtils.java | 40 +++++------ .../flink/runtime/jobmanager/JobManager.scala | 3 +- .../flink/runtime/taskmanager/TaskManager.scala | 5 +- .../taskmanager/TaskManagerConfiguration.scala | 6 +- .../flink/runtime/blob/BlobCacheTest.java | 17 ++--- .../flink/runtime/blob/BlobClientTest.java | 11 +-- .../flink/runtime/blob/BlobUtilsTest.java | 21 +----- .../BlobLibraryCacheManagerTest.java | 8 +-- 13 files changed, 120 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java index fdd3c95..a1515d0 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java @@ -366,7 +366,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable, if (o == null) { return defaultValue; } - else if (o.getClass() == byte[].class) { + else if (o.getClass().equals(byte[].class)) { return (byte[]) o; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index d0d9a45..40ec4e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.blob; import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,8 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting BLOBs through the * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve the file from its local cache. Only if the * local cache does not contain the desired BLOB, the BLOB cache will try to download it from the BLOB server. - * <p> - * This class is thread-safe. */ public final class BlobCache implements BlobService { @@ -49,16 +49,21 @@ public final class BlobCache implements BlobService { private final File storageDir; - private AtomicBoolean shutdownRequested = new AtomicBoolean(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); /** Shutdown hook thread to ensure deletion of the storage directory. */ private final Thread shutdownHook; - public BlobCache(InetSocketAddress serverAddress) { - this.serverAddress = serverAddress; - this.storageDir = BlobUtils.initStorageDirectory(); + public BlobCache(InetSocketAddress serverAddress, Configuration configuration) { + if (serverAddress == null || configuration == null) { + throw new NullPointerException(); + } + + this.serverAddress = serverAddress; + String storageDirectory = configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); + this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB cache storage directory " + storageDir); // Add shutdown hook to delete storage directory @@ -77,7 +82,6 @@ public final class BlobCache implements BlobService { * thrown if an I/O error occurs while downloading the BLOBs from the BLOB server */ public URL getURL(final BlobKey requiredBlob) throws IOException { - if (requiredBlob == null) { throw new IllegalArgumentException("Required BLOB cannot be null."); } @@ -95,16 +99,8 @@ public final class BlobCache implements BlobService { LOG.debug("Trying to download " + requiredBlob + " from " + serverAddress); } - if (bc == null) { - - if (serverAddress == null) { - throw new IllegalArgumentException( - "Argument serverAddress is null: Cannot download libraries from BLOB server"); - } - - bc = new BlobClient(serverAddress); - buf = new byte[BlobServer.BUFFER_SIZE]; - } + bc = new BlobClient(serverAddress); + buf = new byte[BlobServer.BUFFER_SIZE]; InputStream is = null; OutputStream os = null; @@ -160,11 +156,28 @@ public final class BlobCache implements BlobService { } @Override - public void shutdown() throws IOException { + public void shutdown() { if (shutdownRequested.compareAndSet(false, true)) { - FileUtils.deleteDirectory(storageDir); + // Clean up the storage directory + try { + FileUtils.deleteDirectory(storageDir); + } + catch (IOException e) { + LOG.error("BLOB cache failed to properly clean up its storage directory."); + } - Runtime.getRuntime().removeShutdownHook(shutdownHook); + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); + } + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java index f93fd50..3654f8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java @@ -69,7 +69,6 @@ final class BlobInputStream extends InputStream { * throws if an I/O error occurs while reading the BLOB data from the BLOB server */ BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey, final byte[] buf) throws IOException { - this.wrappedInputStream = wrappedInputStream; this.blobKey = blobKey; this.bytesToReceive = BlobServer.readLength(buf, wrappedInputStream); @@ -93,7 +92,6 @@ final class BlobInputStream extends InputStream { @Override public int read() throws IOException { - if (this.bytesReceived == this.bytesToReceive) { return -1; } @@ -125,7 +123,6 @@ final class BlobInputStream extends InputStream { @Override public int read(byte[] b, int off, int len) throws IOException { - final int bytesMissing = this.bytesToReceive - this.bytesReceived; if (bytesMissing == 0) { @@ -155,13 +152,11 @@ final class BlobInputStream extends InputStream { @Override public long skip(long n) throws IOException { - return 0L; } @Override public int available() throws IOException { - return 0; } @@ -171,19 +166,16 @@ final class BlobInputStream extends InputStream { } public void mark(final int readlimit) { - // Do not do anything here } @Override public void reset() throws IOException { - throw new IOException("mark/reset not supported"); } @Override public boolean markSupported() { - return false; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 068a859..220d3a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobID; import org.slf4j.Logger; @@ -39,10 +41,8 @@ import org.slf4j.LoggerFactory; * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store * the BLOBs or temporarily cache them. - * <p> - * This class is thread-safe. */ -public final class BlobServer extends Thread implements BlobService{ +public final class BlobServer extends Thread implements BlobService { /** * The log object used for debugging. @@ -103,7 +103,15 @@ public final class BlobServer extends Thread implements BlobService{ * @throws IOException * thrown if the BLOB server cannot bind to a free network port */ - public BlobServer() throws IOException { + public BlobServer(Configuration config) throws IOException { + + String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); + this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); + LOG.info("Created BLOB server storage directory {}", storageDir); + + // Add shutdown hook to delete storage directory + this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + try { this.serverSocket = new ServerSocket(0); @@ -113,12 +121,6 @@ public final class BlobServer extends Thread implements BlobService{ LOG.info(String.format("Started BLOB server on port %d", this.serverSocket.getLocalPort())); } - - this.storageDir = BlobUtils.initStorageDirectory(); - - LOG.info("Created BLOB server storage directory " + storageDir); - - shutdownHook = BlobUtils.addShutdownHook(this, LOG); } catch (IOException e) { throw new IOException("Could not create BlobServer with random port.", e); @@ -132,7 +134,6 @@ public final class BlobServer extends Thread implements BlobService{ * @return the network port the BLOB server is bound to */ public int getServerPort() { - return this.serverSocket.getLocalPort(); } @@ -143,7 +144,7 @@ public final class BlobServer extends Thread implements BlobService{ * @param key identifying the file * @return file handle to the file */ - public File getStorageLocation(BlobKey key){ + public File getStorageLocation(BlobKey key) { return BlobUtils.getStorageLocation(storageDir, key); } @@ -154,7 +155,7 @@ public final class BlobServer extends Thread implements BlobService{ * @param key to identify the file within the job context * @return file handle to the file */ - public File getStorageLocation(JobID jobID, String key){ + public File getStorageLocation(JobID jobID, String key) { return BlobUtils.getStorageLocation(storageDir, jobID, key); } @@ -164,7 +165,7 @@ public final class BlobServer extends Thread implements BlobService{ * @param jobID all files associated to this jobID will be deleted * @throws IOException */ - public void deleteJobDirectory(JobID jobID) throws IOException{ + public void deleteJobDirectory(JobID jobID) throws IOException { BlobUtils.deleteJobDirectory(storageDir, jobID); } @@ -174,22 +175,21 @@ public final class BlobServer extends Thread implements BlobService{ * @return a temporary file inside the BLOB server's incoming directory */ File getTemporaryFilename() { - return new File(BlobUtils.getIncomingDirectory(storageDir), String.format("temp-%08d", - tempFileCounter.getAndIncrement())); + return new File(BlobUtils.getIncomingDirectory(storageDir), + String.format("temp-%08d", tempFileCounter.getAndIncrement())); } @Override public void run() { - try { - while (!this.shutdownRequested.get()) { new BlobConnection(this.serverSocket.accept(), this).start(); } - - } catch (IOException ioe) { - if (!this.shutdownRequested.get() && LOG.isErrorEnabled()) { - LOG.error("Blob server stopped working.", ioe); + } + catch (Throwable t) { + if (!this.shutdownRequested.get()) { + LOG.error("BLOB server stopped working. Shutting down", t); + shutdown(); } } } @@ -198,7 +198,7 @@ public final class BlobServer extends Thread implements BlobService{ * Shuts down the BLOB server. */ @Override - public void shutdown() throws IOException { + public void shutdown() { if (shutdownRequested.compareAndSet(false, true)) { try { this.serverSocket.close(); @@ -214,12 +214,26 @@ public final class BlobServer extends Thread implements BlobService{ } // Clean up the storage directory - FileUtils.deleteDirectory(storageDir); - - // Remove shutdown hook to prevent resource leaks - Runtime.getRuntime().removeShutdownHook(shutdownHook); + try { + FileUtils.deleteDirectory(storageDir); + } + catch (IOException e) { + LOG.error("BLOB server failed to properly clean up its storage directory."); + } - // TODO: Find/implement strategy to handle content-addressable BLOBs + // Remove shutdown hook to prevent resource leaks, unless this is invoked by the + // shutdown hook itself + if (shutdownHook != null && shutdownHook != Thread.currentThread()) { + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + } + catch (Throwable t) { + LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook."); + } + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java index b6ed249..148476f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java @@ -49,5 +49,5 @@ public interface BlobService { * Shutdown method which is called to terminate the blob service. * @throws IOException */ - void shutdown() throws IOException; + void shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 476f481..53cab1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -20,8 +20,6 @@ package org.apache.flink.runtime.blob; import com.google.common.io.BaseEncoding; import org.apache.commons.io.FileUtils; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.jobgraph.JobID; import org.slf4j.Logger; @@ -61,15 +59,10 @@ public class BlobUtils { * * @return the storage directory used by a BLOB service */ - static File initStorageDirectory() { - File baseDir; - String sd = GlobalConfiguration.getString( - ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); - if (sd != null) { - baseDir = new File(sd); - } else { - baseDir = new File(System.getProperty("java.io.tmpdir")); - } + static File initStorageDirectory(String storageDirectory) { + File baseDir = storageDirectory != null ? + new File(storageDirectory) : + new File(System.getProperty("java.io.tmpdir")); File storageDir; final int MAX_ATTEMPTS = 10; @@ -87,7 +80,7 @@ public class BlobUtils { } // max attempts exceeded to find a storage directory - throw new RuntimeException("Could not create storage directory in '" + baseDir + "'."); + throw new RuntimeException("Could not create storage directory for BLOB store in '" + baseDir + "'."); } /** @@ -139,9 +132,7 @@ public class BlobUtils { * the key of the BLOB * @return the (designated) physical storage location of the BLOB with the given job ID and key */ - static File getStorageLocation(final File storageDir, final JobID jobID, - final String key) { - + static File getStorageLocation(final File storageDir, final JobID jobID, final String key) { return new File(getJobDirectory(storageDir, jobID), BLOB_FILE_PREFIX + encodeKey(key)); } @@ -192,7 +183,7 @@ public class BlobUtils { try { return MessageDigest.getInstance(HASHING_ALGORITHM); } catch (NoSuchAlgorithmException e) { - throw new RuntimeException(e); + throw new RuntimeException("Cannot instantiate the message digest algorithm " + HASHING_ALGORITHM, e); } } @@ -215,9 +206,18 @@ public class BlobUtils { } }); - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - - return shutdownHook; + try { + // Add JVM shutdown hook to call shutdown of service + Runtime.getRuntime().addShutdownHook(shutdownHook); + return shutdownHook; + } + catch (IllegalStateException e) { + // JVM is already shutting down. no need to do our work + return null; + } + catch (Throwable t) { + logger.error("Cannot register shutdown hook that cleanly terminates the BLOB service."); + return null; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index b20fd01..4636999 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -113,7 +113,8 @@ Actor with ActorLogMessages with ActorLogging { val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount)) val instanceManager = new InstanceManager() val scheduler = new FlinkScheduler() - val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval) + val libraryCacheManager = new BlobLibraryCacheManager( + new BlobServer(configuration), cleanupInterval) // List of current jobs running val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index ceb070f..1bfe172 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -582,7 +582,8 @@ import scala.collection.JavaConverters._ log.info("Determined BLOB server address to be {}.", address) - libraryCacheManager = new BlobLibraryCacheManager(new BlobCache(address), cleanupInterval) + libraryCacheManager = new BlobLibraryCacheManager( + new BlobCache(address, configuration), cleanupInterval) } else { libraryCacheManager = new FallbackLibraryCacheManager } @@ -871,7 +872,7 @@ object TaskManager { val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, memorySize, pageSize, tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, timeout, - maxRegistrationDuration) + maxRegistrationDuration, configuration) (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig) } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala index 82cbe9e..8c1217e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala @@ -18,10 +18,14 @@ package org.apache.flink.runtime.taskmanager +import org.apache.flink.configuration.Configuration + import scala.concurrent.duration.{Duration, FiniteDuration} case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, pageSize: Int, tmpDirPaths: Array[String], cleanupInterval: Long, memoryLogggingIntervalMs: Option[Long], profilingInterval: Option[Long], - timeout: FiniteDuration, maxRegistrationDuration: Duration) + timeout: FiniteDuration, + maxRegistrationDuration: Duration, + configuration: Configuration) http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java index d456135..32c8c3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java @@ -31,6 +31,7 @@ import java.net.URL; import java.util.ArrayList; import java.util.List; +import org.apache.flink.configuration.Configuration; import org.junit.Test; /** @@ -50,7 +51,7 @@ public class BlobCacheTest { try { // Start the BLOB server - blobServer = new BlobServer(); + blobServer = new BlobServer(new Configuration()); final InetSocketAddress serverAddress = new InetSocketAddress(blobServer.getServerPort()); // Upload BLOBs @@ -68,7 +69,7 @@ public class BlobCacheTest { } } - blobCache = new BlobCache(serverAddress); + blobCache = new BlobCache(serverAddress, new Configuration()); for(int i = 0; i < blobKeys.size(); i++){ blobCache.getURL(blobKeys.get(i)); @@ -109,19 +110,11 @@ public class BlobCacheTest { fail(ioe.getMessage()); } finally { if (blobServer != null) { - try { - blobServer.shutdown(); - } catch (IOException e) { - e.printStackTrace(); - } + blobServer.shutdown(); } if(blobCache != null){ - try { - blobCache.shutdown(); - } catch (IOException e) { - e.printStackTrace(); - } + blobCache.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 2542bbb..1465777 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -31,6 +31,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.security.MessageDigest; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.util.StringUtils; import org.junit.AfterClass; @@ -57,9 +58,8 @@ public class BlobClientTest { */ @BeforeClass public static void startServer() { - try { - BLOB_SERVER = new BlobServer(); + BLOB_SERVER = new BlobServer(new Configuration()); } catch (IOException ioe) { fail(StringUtils.stringifyException(ioe)); } @@ -71,13 +71,8 @@ public class BlobClientTest { */ @AfterClass public static void stopServer() { - if (BLOB_SERVER != null) { - try { - BLOB_SERVER.shutdown(); - } catch (IOException e) { - e.printStackTrace(); - } + BLOB_SERVER.shutdown(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java index 15a63b3..a5c83b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java @@ -18,33 +18,18 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; +import static org.mockito.Mockito.mock; + import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; import java.io.File; -import static org.mockito.Mockito.mock; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(BlobKey.class) public class BlobUtilsTest { @Test(expected = Exception.class) public void testExceptionOnCreateStorageDirectoryFailure() { - - // Configure a non existing directory - Configuration config = new Configuration(); - config.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, "/cannot-create-this"); - - GlobalConfiguration.includeConfiguration(config); - // Should throw an Exception - BlobUtils.initStorageDirectory(); + BlobUtils.initStorageDirectory("/cannot-create-this"); } @Test(expected = Exception.class) http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 71c0669..2675346 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -51,7 +51,7 @@ public class BlobLibraryCacheManagerTest { final byte[] buf = new byte[128]; try { - server = new BlobServer(); + server = new BlobServer(new Configuration()); InetSocketAddress blobSocketAddress = new InetSocketAddress(server.getServerPort()); BlobClient bc = new BlobClient(blobSocketAddress); @@ -109,11 +109,7 @@ public class BlobLibraryCacheManagerTest { } finally{ if (server != null){ - try { - server.shutdown(); - } catch (IOException e) { - e.printStackTrace(); - } + server.shutdown(); } if (libraryCacheManager != null){