Repository: flink
Updated Branches:
  refs/heads/master 90834ca70 -> 6151d707c


[FLINK-1492] [FLINK-1513] Fix BLOB service shutdown message and avoid global 
configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6151d707
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6151d707
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6151d707

Branch: refs/heads/master
Commit: 6151d707c6130a38d18501414447cfa88969d9b8
Parents: 69b7945
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 11 19:08:21 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 20:42:43 2015 +0100

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      |  2 +-
 .../apache/flink/runtime/blob/BlobCache.java    | 53 +++++++++------
 .../flink/runtime/blob/BlobInputStream.java     |  8 ---
 .../apache/flink/runtime/blob/BlobServer.java   | 70 ++++++++++++--------
 .../apache/flink/runtime/blob/BlobService.java  |  2 +-
 .../apache/flink/runtime/blob/BlobUtils.java    | 40 +++++------
 .../flink/runtime/jobmanager/JobManager.scala   |  3 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  5 +-
 .../taskmanager/TaskManagerConfiguration.scala  |  6 +-
 .../flink/runtime/blob/BlobCacheTest.java       | 17 ++---
 .../flink/runtime/blob/BlobClientTest.java      | 11 +--
 .../flink/runtime/blob/BlobUtilsTest.java       | 21 +-----
 .../BlobLibraryCacheManagerTest.java            |  8 +--
 13 files changed, 120 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java 
b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index fdd3c95..a1515d0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -366,7 +366,7 @@ public class Configuration implements IOReadableWritable, 
java.io.Serializable,
                if (o == null) {
                        return defaultValue;
                }
-               else if (o.getClass() == byte[].class) {
+               else if (o.getClass().equals(byte[].class)) {
                        return (byte[]) o;
                }
                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index d0d9a45..40ec4e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,8 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * The BLOB cache implements a local cache for content-addressable BLOBs. When 
requesting BLOBs through the
  * {@link BlobCache#getURL} methods, the BLOB cache will first attempt serve 
the file from its local cache. Only if the
  * local cache does not contain the desired BLOB, the BLOB cache will try to 
download it from the BLOB server.
- * <p>
- * This class is thread-safe.
  */
 public final class BlobCache implements BlobService {
 
@@ -49,16 +49,21 @@ public final class BlobCache implements BlobService {
 
        private final File storageDir;
 
-       private AtomicBoolean shutdownRequested = new AtomicBoolean();
+       private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
        /** Shutdown hook thread to ensure deletion of the storage directory. */
        private final Thread shutdownHook;
 
-       public BlobCache(InetSocketAddress serverAddress) {
-               this.serverAddress = serverAddress;
 
-               this.storageDir = BlobUtils.initStorageDirectory();
+       public BlobCache(InetSocketAddress serverAddress, Configuration 
configuration) {
+               if (serverAddress == null || configuration == null) {
+                       throw new NullPointerException();
+               }
+
+               this.serverAddress = serverAddress;
 
+               String storageDirectory = 
configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+               this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB cache storage directory " + storageDir);
 
                // Add shutdown hook to delete storage directory
@@ -77,7 +82,6 @@ public final class BlobCache implements BlobService {
         *         thrown if an I/O error occurs while downloading the BLOBs 
from the BLOB server
         */
        public URL getURL(final BlobKey requiredBlob) throws IOException {
-
                if (requiredBlob == null) {
                        throw new IllegalArgumentException("Required BLOB 
cannot be null.");
                }
@@ -95,16 +99,8 @@ public final class BlobCache implements BlobService {
                                        LOG.debug("Trying to download " + 
requiredBlob + " from " + serverAddress);
                                }
 
-                               if (bc == null) {
-
-                                       if (serverAddress == null) {
-                                               throw new 
IllegalArgumentException(
-                                                       "Argument serverAddress 
is null: Cannot download libraries from BLOB server");
-                                       }
-
-                                       bc = new BlobClient(serverAddress);
-                                       buf = new byte[BlobServer.BUFFER_SIZE];
-                               }
+                               bc = new BlobClient(serverAddress);
+                               buf = new byte[BlobServer.BUFFER_SIZE];
 
                                InputStream is = null;
                                OutputStream os = null;
@@ -160,11 +156,28 @@ public final class BlobCache implements BlobService {
        }
 
        @Override
-       public void shutdown() throws IOException {
+       public void shutdown() {
                if (shutdownRequested.compareAndSet(false, true)) {
-                       FileUtils.deleteDirectory(storageDir);
+                       // Clean up the storage directory
+                       try {
+                               FileUtils.deleteDirectory(storageDir);
+                       }
+                       catch (IOException e) {
+                               LOG.error("BLOB cache failed to properly clean 
up its storage directory.");
+                       }
 
-                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                       // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the shutdown hook itself
+                       if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
+                               try {
+                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                               }
+                               catch (IllegalStateException e) {
+                                       // race, JVM is in shutdown already, we 
can safely ignore this
+                               }
+                               catch (Throwable t) {
+                                       LOG.warn("Exception while unregistering 
BLOB cache's cleanup shutdown hook.");
+                               }
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
index f93fd50..3654f8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java
@@ -69,7 +69,6 @@ final class BlobInputStream extends InputStream {
         *         throws if an I/O error occurs while reading the BLOB data 
from the BLOB server
         */
        BlobInputStream(final InputStream wrappedInputStream, final BlobKey 
blobKey, final byte[] buf) throws IOException {
-
                this.wrappedInputStream = wrappedInputStream;
                this.blobKey = blobKey;
                this.bytesToReceive = BlobServer.readLength(buf, 
wrappedInputStream);
@@ -93,7 +92,6 @@ final class BlobInputStream extends InputStream {
 
        @Override
        public int read() throws IOException {
-
                if (this.bytesReceived == this.bytesToReceive) {
                        return -1;
                }
@@ -125,7 +123,6 @@ final class BlobInputStream extends InputStream {
 
        @Override
        public int read(byte[] b, int off, int len) throws IOException {
-
                final int bytesMissing = this.bytesToReceive - 
this.bytesReceived;
 
                if (bytesMissing == 0) {
@@ -155,13 +152,11 @@ final class BlobInputStream extends InputStream {
 
        @Override
        public long skip(long n) throws IOException {
-
                return 0L;
        }
 
        @Override
        public int available() throws IOException {
-
                return 0;
        }
 
@@ -171,19 +166,16 @@ final class BlobInputStream extends InputStream {
        }
 
        public void mark(final int readlimit) {
-
                // Do not do anything here
        }
 
        @Override
        public void reset() throws IOException {
-
                throw new IOException("mark/reset not supported");
        }
 
        @Override
        public boolean markSupported() {
-
                return false;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 068a859..220d3a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobID;
 
 import org.slf4j.Logger;
@@ -39,10 +41,8 @@ import org.slf4j.LoggerFactory;
  * This class implements the BLOB server. The BLOB server is responsible for 
listening for incoming requests and
  * spawning threads to handle these requests. Furthermore, it takes care of 
creating the directory structure to store
  * the BLOBs or temporarily cache them.
- * <p>
- * This class is thread-safe.
  */
-public final class BlobServer extends Thread implements BlobService{
+public final class BlobServer extends Thread implements BlobService {
 
        /**
         * The log object used for debugging.
@@ -103,7 +103,15 @@ public final class BlobServer extends Thread implements 
BlobService{
         * @throws IOException
         *         thrown if the BLOB server cannot bind to a free network port
         */
-       public BlobServer() throws IOException {
+       public BlobServer(Configuration config) throws IOException {
+
+               String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
+               this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
+               LOG.info("Created BLOB server storage directory {}", 
storageDir);
+
+               // Add shutdown hook to delete storage directory
+               this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+
                try {
                        this.serverSocket = new ServerSocket(0);
 
@@ -113,12 +121,6 @@ public final class BlobServer extends Thread implements 
BlobService{
                                LOG.info(String.format("Started BLOB server on 
port %d",
                                                
this.serverSocket.getLocalPort()));
                        }
-
-                       this.storageDir = BlobUtils.initStorageDirectory();
-
-                       LOG.info("Created BLOB server storage directory " + 
storageDir);
-
-                       shutdownHook = BlobUtils.addShutdownHook(this, LOG);
                }
                catch (IOException e) {
                        throw new IOException("Could not create BlobServer with 
random port.", e);
@@ -132,7 +134,6 @@ public final class BlobServer extends Thread implements 
BlobService{
         * @return the network port the BLOB server is bound to
         */
        public int getServerPort() {
-
                return this.serverSocket.getLocalPort();
        }
 
@@ -143,7 +144,7 @@ public final class BlobServer extends Thread implements 
BlobService{
         * @param key identifying the file
         * @return file handle to the file
         */
-       public File getStorageLocation(BlobKey key){
+       public File getStorageLocation(BlobKey key) {
                return BlobUtils.getStorageLocation(storageDir, key);
        }
 
@@ -154,7 +155,7 @@ public final class BlobServer extends Thread implements 
BlobService{
         * @param key to identify the file within the job context
         * @return file handle to the file
         */
-       public File getStorageLocation(JobID jobID, String key){
+       public File getStorageLocation(JobID jobID, String key) {
                return BlobUtils.getStorageLocation(storageDir, jobID, key);
        }
 
@@ -164,7 +165,7 @@ public final class BlobServer extends Thread implements 
BlobService{
         * @param jobID all files associated to this jobID will be deleted
         * @throws IOException
         */
-       public void deleteJobDirectory(JobID jobID) throws IOException{
+       public void deleteJobDirectory(JobID jobID) throws IOException {
                BlobUtils.deleteJobDirectory(storageDir, jobID);
        }
 
@@ -174,22 +175,21 @@ public final class BlobServer extends Thread implements 
BlobService{
         * @return a temporary file inside the BLOB server's incoming directory
         */
        File getTemporaryFilename() {
-               return new File(BlobUtils.getIncomingDirectory(storageDir), 
String.format("temp-%08d",
-                               tempFileCounter.getAndIncrement()));
+               return new File(BlobUtils.getIncomingDirectory(storageDir),
+                               String.format("temp-%08d", 
tempFileCounter.getAndIncrement()));
        }
 
        @Override
        public void run() {
-
                try {
-
                        while (!this.shutdownRequested.get()) {
                                new BlobConnection(this.serverSocket.accept(), 
this).start();
                        }
-
-               } catch (IOException ioe) {
-                       if (!this.shutdownRequested.get() && 
LOG.isErrorEnabled()) {
-                               LOG.error("Blob server stopped working.", ioe);
+               }
+               catch (Throwable t) {
+                       if (!this.shutdownRequested.get()) {
+                               LOG.error("BLOB server stopped working. 
Shutting down", t);
+                               shutdown();
                        }
                }
        }
@@ -198,7 +198,7 @@ public final class BlobServer extends Thread implements 
BlobService{
         * Shuts down the BLOB server.
         */
        @Override
-       public void shutdown() throws IOException {
+       public void shutdown() {
                if (shutdownRequested.compareAndSet(false, true)) {
                        try {
                                this.serverSocket.close();
@@ -214,12 +214,26 @@ public final class BlobServer extends Thread implements 
BlobService{
                        }
 
                        // Clean up the storage directory
-                       FileUtils.deleteDirectory(storageDir);
-
-                       // Remove shutdown hook to prevent resource leaks
-                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                       try {
+                               FileUtils.deleteDirectory(storageDir);
+                       }
+                       catch (IOException e) {
+                               LOG.error("BLOB server failed to properly clean 
up its storage directory.");
+                       }
 
-                       // TODO: Find/implement strategy to handle 
content-addressable BLOBs
+                       // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the
+                       // shutdown hook itself
+                       if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
+                               try {
+                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                               }
+                               catch (IllegalStateException e) {
+                                       // race, JVM is in shutdown already, we 
can safely ignore this
+                               }
+                               catch (Throwable t) {
+                                       LOG.warn("Exception while unregistering 
BLOB server's cleanup shutdown hook.");
+                               }
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index b6ed249..148476f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -49,5 +49,5 @@ public interface BlobService {
         * Shutdown method which is called to terminate the blob service.
         * @throws IOException
         */
-       void shutdown() throws IOException;
+       void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 476f481..53cab1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -20,8 +20,6 @@ package org.apache.flink.runtime.blob;
 
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.slf4j.Logger;
 
@@ -61,15 +59,10 @@ public class BlobUtils {
         *
         * @return the storage directory used by a BLOB service
         */
-       static File initStorageDirectory() {
-               File baseDir;
-               String sd = GlobalConfiguration.getString(
-                               ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, 
null);
-               if (sd != null) {
-                       baseDir = new File(sd);
-               } else {
-                       baseDir = new 
File(System.getProperty("java.io.tmpdir"));
-               }
+       static File initStorageDirectory(String storageDirectory) {
+               File baseDir = storageDirectory != null ?
+                               new File(storageDirectory) :
+                               new File(System.getProperty("java.io.tmpdir"));
 
                File storageDir;
                final int MAX_ATTEMPTS = 10;
@@ -87,7 +80,7 @@ public class BlobUtils {
                }
 
                // max attempts exceeded to find a storage directory
-               throw new RuntimeException("Could not create storage directory 
in '" + baseDir + "'.");
+               throw new RuntimeException("Could not create storage directory 
for BLOB store in '" + baseDir + "'.");
        }
 
        /**
@@ -139,9 +132,7 @@ public class BlobUtils {
         *        the key of the BLOB
         * @return the (designated) physical storage location of the BLOB with 
the given job ID and key
         */
-       static File getStorageLocation(final File storageDir, final JobID jobID,
-                                                                       final 
String key) {
-
+       static File getStorageLocation(final File storageDir, final JobID 
jobID, final String key) {
                return new File(getJobDirectory(storageDir, jobID), 
BLOB_FILE_PREFIX + encodeKey(key));
        }
 
@@ -192,7 +183,7 @@ public class BlobUtils {
                try {
                        return MessageDigest.getInstance(HASHING_ALGORITHM);
                } catch (NoSuchAlgorithmException e) {
-                       throw new RuntimeException(e);
+                       throw new RuntimeException("Cannot instantiate the 
message digest algorithm " + HASHING_ALGORITHM, e);
                }
        }
 
@@ -215,9 +206,18 @@ public class BlobUtils {
                        }
                });
 
-               // Add JVM shutdown hook to call shutdown of service
-               Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-               return shutdownHook;
+               try {
+                       // Add JVM shutdown hook to call shutdown of service
+                       Runtime.getRuntime().addShutdownHook(shutdownHook);
+                       return shutdownHook;
+               }
+               catch (IllegalStateException e) {
+                       // JVM is already shutting down. no need to do our work
+                       return null;
+               }
+               catch (Throwable t) {
+                       logger.error("Cannot register shutdown hook that 
cleanly terminates the BLOB service.");
+                       return null;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b20fd01..4636999 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -113,7 +113,8 @@ Actor with ActorLogMessages with ActorLogging {
   val accumulatorManager = new AccumulatorManager(Math.min(1, archiveCount))
   val instanceManager = new InstanceManager()
   val scheduler = new FlinkScheduler()
-  val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), 
cleanupInterval)
+  val libraryCacheManager = new BlobLibraryCacheManager(
+                                        new BlobServer(configuration), 
cleanupInterval)
 
   // List of current jobs running
   val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, 
JobInfo)]()

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index ceb070f..1bfe172 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -582,7 +582,8 @@ import scala.collection.JavaConverters._
 
       log.info("Determined BLOB server address to be {}.", address)
 
-      libraryCacheManager = new BlobLibraryCacheManager(new 
BlobCache(address), cleanupInterval)
+      libraryCacheManager = new BlobLibraryCacheManager(
+                                     new BlobCache(address, configuration), 
cleanupInterval)
     } else {
       libraryCacheManager = new FallbackLibraryCacheManager
     }
@@ -871,7 +872,7 @@ object TaskManager {
 
     val taskManagerConfig = TaskManagerConfiguration(numberOfSlots, 
memorySize, pageSize,
       tmpDirs, cleanupInterval, memoryLoggingIntervalMs, profilingInterval, 
timeout,
-      maxRegistrationDuration)
+      maxRegistrationDuration, configuration)
 
     (connectionInfo, jobManagerURL, taskManagerConfig, networkConfig)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
index 82cbe9e..8c1217e 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManagerConfiguration.scala
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.taskmanager
 
+import org.apache.flink.configuration.Configuration
+
 import scala.concurrent.duration.{Duration, FiniteDuration}
 
 case class TaskManagerConfiguration(numberOfSlots: Int, memorySize: Long, 
pageSize: Int,
                                     tmpDirPaths: Array[String], 
cleanupInterval: Long,
                                     memoryLogggingIntervalMs: Option[Long],
                                     profilingInterval: Option[Long],
-                                    timeout: FiniteDuration, 
maxRegistrationDuration: Duration)
+                                    timeout: FiniteDuration,
+                                    maxRegistrationDuration: Duration,
+                                    configuration: Configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
index d456135..32c8c3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
@@ -31,6 +31,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.configuration.Configuration;
 import org.junit.Test;
 
 /**
@@ -50,7 +51,7 @@ public class BlobCacheTest {
                try {
 
                        // Start the BLOB server
-                       blobServer = new BlobServer();
+                       blobServer = new BlobServer(new Configuration());
                        final InetSocketAddress serverAddress = new 
InetSocketAddress(blobServer.getServerPort());
 
                        // Upload BLOBs
@@ -68,7 +69,7 @@ public class BlobCacheTest {
                                }
                        }
 
-                       blobCache = new BlobCache(serverAddress);
+                       blobCache = new BlobCache(serverAddress, new 
Configuration());
 
                        for(int i = 0; i < blobKeys.size(); i++){
                                blobCache.getURL(blobKeys.get(i));
@@ -109,19 +110,11 @@ public class BlobCacheTest {
                        fail(ioe.getMessage());
                } finally {
                        if (blobServer != null) {
-                               try {
-                                       blobServer.shutdown();
-                               } catch (IOException e) {
-                                       e.printStackTrace();
-                               }
+                               blobServer.shutdown();
                        }
 
                        if(blobCache != null){
-                               try {
-                                       blobCache.shutdown();
-                               } catch (IOException e) {
-                                       e.printStackTrace();
-                               }
+                               blobCache.shutdown();
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 2542bbb..1465777 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -31,6 +31,7 @@ import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.MessageDigest;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.util.StringUtils;
 import org.junit.AfterClass;
@@ -57,9 +58,8 @@ public class BlobClientTest {
         */
        @BeforeClass
        public static void startServer() {
-
                try {
-                       BLOB_SERVER = new BlobServer();
+                       BLOB_SERVER = new BlobServer(new Configuration());
                } catch (IOException ioe) {
                        fail(StringUtils.stringifyException(ioe));
                }
@@ -71,13 +71,8 @@ public class BlobClientTest {
         */
        @AfterClass
        public static void stopServer() {
-
                if (BLOB_SERVER != null) {
-                       try {
-                               BLOB_SERVER.shutdown();
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                       }
+                       BLOB_SERVER.shutdown();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
index 15a63b3..a5c83b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobUtilsTest.java
@@ -18,33 +18,18 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 
-import static org.mockito.Mockito.mock;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(BlobKey.class)
 public class BlobUtilsTest {
 
        @Test(expected = Exception.class)
        public void testExceptionOnCreateStorageDirectoryFailure() {
-
-               // Configure a non existing directory
-               Configuration config = new Configuration();
-               config.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, 
"/cannot-create-this");
-
-               GlobalConfiguration.includeConfiguration(config);
-
                // Should throw an Exception
-               BlobUtils.initStorageDirectory();
+               BlobUtils.initStorageDirectory("/cannot-create-this");
        }
 
        @Test(expected = Exception.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/6151d707/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 71c0669..2675346 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -51,7 +51,7 @@ public class BlobLibraryCacheManagerTest {
                final byte[] buf = new byte[128];
 
                try {
-                       server = new BlobServer();
+                       server = new BlobServer(new Configuration());
                        InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getServerPort());
                        BlobClient bc = new BlobClient(blobSocketAddress);
 
@@ -109,11 +109,7 @@ public class BlobLibraryCacheManagerTest {
                }
                finally{
                        if (server != null){
-                               try {
-                                       server.shutdown();
-                               } catch (IOException e) {
-                                       e.printStackTrace();
-                               }
+                               server.shutdown();
                        }
 
                        if (libraryCacheManager != null){

Reply via email to