Repository: flink
Updated Branches:
  refs/heads/release-0.8 e01712111 -> 5b420d847


[FLINK-1492] Fix exceptions on blob store shutdown

Conflicts:
        flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
        
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
        flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b420d84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b420d84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b420d84

Branch: refs/heads/release-0.8
Commit: 5b420d8477b2ed1dd72d3bbcade5fd0f72a6110b
Parents: e017121
Author: Ufuk Celebi <[email protected]>
Authored: Mon Feb 9 11:44:47 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Tue Feb 10 14:40:35 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    | 16 ++++-
 .../apache/flink/runtime/blob/BlobServer.java   | 66 ++++++++++++--------
 .../apache/flink/runtime/blob/BlobUtils.java    | 30 ++++++++-
 3 files changed, 84 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 0f57ea3..d0d9a45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * The BLOB cache implements a local cache for content-addressable BLOBs. When 
requesting BLOBs through the
@@ -48,6 +49,10 @@ public final class BlobCache implements BlobService {
 
        private final File storageDir;
 
+       private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+       /** Shutdown hook thread to ensure deletion of the storage directory. */
+       private final Thread shutdownHook;
 
        public BlobCache(InetSocketAddress serverAddress) {
                this.serverAddress = serverAddress;
@@ -55,6 +60,9 @@ public final class BlobCache implements BlobService {
                this.storageDir = BlobUtils.initStorageDirectory();
 
                LOG.info("Created BLOB cache storage directory " + storageDir);
+
+               // Add shutdown hook to delete storage directory
+               shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
 
        /**
@@ -152,7 +160,11 @@ public final class BlobCache implements BlobService {
        }
 
        @Override
-       public void shutdown() throws IOException{
-               FileUtils.deleteDirectory(storageDir);
+       public void shutdown() throws IOException {
+               if (shutdownRequested.compareAndSet(false, true)) {
+                       FileUtils.deleteDirectory(storageDir);
+
+                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 60e1716..068a859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements 
BlobService{
        /**
         * Indicates whether a shutdown of server component has been requested.
         */
-       private volatile boolean shutdownRequested = false;
+       private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+       /** Shutdown hook thread to ensure deletion of the storage directory. */
+       private final Thread shutdownHook;
 
        /**
         * Is the root directory for file storage
@@ -100,18 +104,25 @@ public final class BlobServer extends Thread implements 
BlobService{
         *         thrown if the BLOB server cannot bind to a free network port
         */
        public BlobServer() throws IOException {
+               try {
+                       this.serverSocket = new ServerSocket(0);
 
-               this.serverSocket = new ServerSocket(0);
-               start();
+                       start();
 
-               if (LOG.isInfoEnabled()) {
-                       LOG.info(String.format("Started BLOB server on port %d",
-                               this.serverSocket.getLocalPort()));
-               }
+                       if (LOG.isInfoEnabled()) {
+                               LOG.info(String.format("Started BLOB server on 
port %d",
+                                               
this.serverSocket.getLocalPort()));
+                       }
+
+                       this.storageDir = BlobUtils.initStorageDirectory();
 
-               this.storageDir = BlobUtils.initStorageDirectory();
+                       LOG.info("Created BLOB server storage directory " + 
storageDir);
 
-               LOG.info("Created BLOB server storage directory " + storageDir);
+                       shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+               }
+               catch (IOException e) {
+                       throw new IOException("Could not create BlobServer with 
random port.", e);
+               }
        }
 
        /**
@@ -172,12 +183,12 @@ public final class BlobServer extends Thread implements 
BlobService{
 
                try {
 
-                       while (!this.shutdownRequested) {
+                       while (!this.shutdownRequested.get()) {
                                new BlobConnection(this.serverSocket.accept(), 
this).start();
                        }
 
                } catch (IOException ioe) {
-                       if (!this.shutdownRequested && LOG.isErrorEnabled()) {
+                       if (!this.shutdownRequested.get() && 
LOG.isErrorEnabled()) {
                                LOG.error("Blob server stopped working.", ioe);
                        }
                }
@@ -188,23 +199,28 @@ public final class BlobServer extends Thread implements 
BlobService{
         */
        @Override
        public void shutdown() throws IOException {
-
-               this.shutdownRequested = true;
-               try {
-                       this.serverSocket.close();
-               } catch (IOException ioe) {
+               if (shutdownRequested.compareAndSet(false, true)) {
+                       try {
+                               this.serverSocket.close();
+                       }
+                       catch (IOException ioe) {
                                LOG.debug("Error while closing the server 
socket.", ioe);
-               }
-               try {
-                       join();
-               } catch (InterruptedException ie) {
-                       LOG.debug("Error while waiting for this thread to 
die.", ie);
-               }
+                       }
+                       try {
+                               join();
+                       }
+                       catch (InterruptedException ie) {
+                               LOG.debug("Error while waiting for this thread 
to die.", ie);
+                       }
 
-               // Clean up the storage directory
-               FileUtils.deleteDirectory(storageDir);
+                       // Clean up the storage directory
+                       FileUtils.deleteDirectory(storageDir);
 
-               // TODO: Find/implement strategy to handle content-addressable 
BLOBs
+                       // Remove shutdown hook to prevent resource leaks
+                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
+
+                       // TODO: Find/implement strategy to handle 
content-addressable BLOBs
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index dec574d..476f481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,7 +32,10 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 public class BlobUtils {
+
        /**
         * Algorithm to be used for calculating the BLOB keys.
         */
@@ -52,7 +56,6 @@ public class BlobUtils {
         */
        static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
 
-
        /**
         * Creates a storage directory for a blob service.
         *
@@ -192,4 +195,29 @@ public class BlobUtils {
                        throw new RuntimeException(e);
                }
        }
+
+       /**
+        * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
+        */
+       static Thread addShutdownHook(final BlobService service, final Logger 
logger) {
+               checkNotNull(service);
+               checkNotNull(logger);
+
+               final Thread shutdownHook = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       service.shutdown();
+                               }
+                               catch (Throwable t) {
+                                       logger.error("Error during shutdown of 
blob service via JVM shutdown hook: " + t.getMessage(), t);
+                               }
+                       }
+               });
+
+               // Add JVM shutdown hook to call shutdown of service
+               Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+               return shutdownHook;
+       }
 }

Reply via email to