Repository: flink
Updated Branches:
  refs/heads/master 7407076d3 -> b88f909ce


[FLINK-1492] Fix exceptions on blob store shutdown

This closes #376


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b88f909c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b88f909c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b88f909c

Branch: refs/heads/master
Commit: b88f909ce44bbd25528afee03079d0437d5f9a5b
Parents: 7407076
Author: Ufuk Celebi <[email protected]>
Authored: Mon Feb 9 11:44:47 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Tue Feb 10 15:32:19 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobCache.java    | 15 +++++--
 .../apache/flink/runtime/blob/BlobServer.java   | 46 ++++++++++++--------
 .../apache/flink/runtime/blob/BlobUtils.java    | 21 +++++----
 3 files changed, 52 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 3f0fcb6..d0d9a45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * The BLOB cache implements a local cache for content-addressable BLOBs. When 
requesting BLOBs through the
@@ -48,6 +49,10 @@ public final class BlobCache implements BlobService {
 
        private final File storageDir;
 
+       private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+       /** Shutdown hook thread to ensure deletion of the storage directory. */
+       private final Thread shutdownHook;
 
        public BlobCache(InetSocketAddress serverAddress) {
                this.serverAddress = serverAddress;
@@ -57,7 +62,7 @@ public final class BlobCache implements BlobService {
                LOG.info("Created BLOB cache storage directory " + storageDir);
 
                // Add shutdown hook to delete storage directory
-               BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG);
+               shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        }
 
        /**
@@ -155,7 +160,11 @@ public final class BlobCache implements BlobService {
        }
 
        @Override
-       public void shutdown() throws IOException{
-               FileUtils.deleteDirectory(storageDir);
+       public void shutdown() throws IOException {
+               if (shutdownRequested.compareAndSet(false, true)) {
+                       FileUtils.deleteDirectory(storageDir);
+
+                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index cf82a9b..068a859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements 
BlobService{
        /**
         * Indicates whether a shutdown of server component has been requested.
         */
-       private volatile boolean shutdownRequested = false;
+       private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+       /** Shutdown hook thread to ensure deletion of the storage directory. */
+       private final Thread shutdownHook;
 
        /**
         * Is the root directory for file storage
@@ -114,8 +118,7 @@ public final class BlobServer extends Thread implements 
BlobService{
 
                        LOG.info("Created BLOB server storage directory " + 
storageDir);
 
-                       // Add shutdown hook to delete storage directory
-                       BlobUtils.addDeleteDirectoryShutdownHook(storageDir, 
LOG);
+                       shutdownHook = BlobUtils.addShutdownHook(this, LOG);
                }
                catch (IOException e) {
                        throw new IOException("Could not create BlobServer with 
random port.", e);
@@ -180,12 +183,12 @@ public final class BlobServer extends Thread implements 
BlobService{
 
                try {
 
-                       while (!this.shutdownRequested) {
+                       while (!this.shutdownRequested.get()) {
                                new BlobConnection(this.serverSocket.accept(), 
this).start();
                        }
 
                } catch (IOException ioe) {
-                       if (!this.shutdownRequested && LOG.isErrorEnabled()) {
+                       if (!this.shutdownRequested.get() && 
LOG.isErrorEnabled()) {
                                LOG.error("Blob server stopped working.", ioe);
                        }
                }
@@ -196,23 +199,28 @@ public final class BlobServer extends Thread implements 
BlobService{
         */
        @Override
        public void shutdown() throws IOException {
-
-               this.shutdownRequested = true;
-               try {
-                       this.serverSocket.close();
-               } catch (IOException ioe) {
+               if (shutdownRequested.compareAndSet(false, true)) {
+                       try {
+                               this.serverSocket.close();
+                       }
+                       catch (IOException ioe) {
                                LOG.debug("Error while closing the server 
socket.", ioe);
-               }
-               try {
-                       join();
-               } catch (InterruptedException ie) {
-                       LOG.debug("Error while waiting for this thread to 
die.", ie);
-               }
+                       }
+                       try {
+                               join();
+                       }
+                       catch (InterruptedException ie) {
+                               LOG.debug("Error while waiting for this thread 
to die.", ie);
+                       }
+
+                       // Clean up the storage directory
+                       FileUtils.deleteDirectory(storageDir);
 
-               // Clean up the storage directory
-               FileUtils.deleteDirectory(storageDir);
+                       // Remove shutdown hook to prevent resource leaks
+                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
 
-               // TODO: Find/implement strategy to handle content-addressable 
BLOBs
+                       // TODO: Find/implement strategy to handle 
content-addressable BLOBs
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 751137a..476f481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -197,22 +197,27 @@ public class BlobUtils {
        }
 
        /**
-        * Adds a shutdown hook to the JVM to delete the given directory.
+        * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
         */
-       static void addDeleteDirectoryShutdownHook(final File dir, final Logger 
errorLogger) {
-               checkNotNull(dir);
+       static Thread addShutdownHook(final BlobService service, final Logger 
logger) {
+               checkNotNull(service);
+               checkNotNull(logger);
 
-               // Add shutdown hook to delete directory
-               Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+               final Thread shutdownHook = new Thread(new Runnable() {
                        @Override
                        public void run() {
                                try {
-                                       FileUtils.deleteDirectory(dir);
+                                       service.shutdown();
                                }
                                catch (Throwable t) {
-                                       errorLogger.error("Error deleting 
directory " + dir + " during JVM shutdown: " + t.getMessage(), t);
+                                       logger.error("Error during shutdown of 
blob service via JVM shutdown hook: " + t.getMessage(), t);
                                }
                        }
-               }));
+               });
+
+               // Add JVM shutdown hook to call shutdown of service
+               Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+               return shutdownHook;
        }
 }

Reply via email to