Repository: flink Updated Branches: refs/heads/master 7407076d3 -> b88f909ce
[FLINK-1492] Fix exceptions on blob store shutdown This closes #376 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b88f909c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b88f909c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b88f909c Branch: refs/heads/master Commit: b88f909ce44bbd25528afee03079d0437d5f9a5b Parents: 7407076 Author: Ufuk Celebi <[email protected]> Authored: Mon Feb 9 11:44:47 2015 +0100 Committer: Robert Metzger <[email protected]> Committed: Tue Feb 10 15:32:19 2015 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/blob/BlobCache.java | 15 +++++-- .../apache/flink/runtime/blob/BlobServer.java | 46 ++++++++++++-------- .../apache/flink/runtime/blob/BlobUtils.java | 21 +++++---- 3 files changed, 52 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 3f0fcb6..d0d9a45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -29,6 +29,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.URL; +import java.util.concurrent.atomic.AtomicBoolean; /** * The BLOB cache implements a local cache for content-addressable BLOBs. When requesting BLOBs through the @@ -48,6 +49,10 @@ public final class BlobCache implements BlobService { private final File storageDir; + private AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the storage directory. */ + private final Thread shutdownHook; public BlobCache(InetSocketAddress serverAddress) { this.serverAddress = serverAddress; @@ -57,7 +62,7 @@ public final class BlobCache implements BlobService { LOG.info("Created BLOB cache storage directory " + storageDir); // Add shutdown hook to delete storage directory - BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG); + shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** @@ -155,7 +160,11 @@ public final class BlobCache implements BlobService { } @Override - public void shutdown() throws IOException{ - FileUtils.deleteDirectory(storageDir); + public void shutdown() throws IOException { + if (shutdownRequested.compareAndSet(false, true)) { + FileUtils.deleteDirectory(storageDir); + + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index cf82a9b..068a859 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -26,6 +26,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.URL; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; @@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements BlobService{ /** * Indicates whether a shutdown of server component has been requested. */ - private volatile boolean shutdownRequested = false; + private AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** Shutdown hook thread to ensure deletion of the storage directory. */ + private final Thread shutdownHook; /** * Is the root directory for file storage @@ -114,8 +118,7 @@ public final class BlobServer extends Thread implements BlobService{ LOG.info("Created BLOB server storage directory " + storageDir); - // Add shutdown hook to delete storage directory - BlobUtils.addDeleteDirectoryShutdownHook(storageDir, LOG); + shutdownHook = BlobUtils.addShutdownHook(this, LOG); } catch (IOException e) { throw new IOException("Could not create BlobServer with random port.", e); @@ -180,12 +183,12 @@ public final class BlobServer extends Thread implements BlobService{ try { - while (!this.shutdownRequested) { + while (!this.shutdownRequested.get()) { new BlobConnection(this.serverSocket.accept(), this).start(); } } catch (IOException ioe) { - if (!this.shutdownRequested && LOG.isErrorEnabled()) { + if (!this.shutdownRequested.get() && LOG.isErrorEnabled()) { LOG.error("Blob server stopped working.", ioe); } } @@ -196,23 +199,28 @@ public final class BlobServer extends Thread implements BlobService{ */ @Override public void shutdown() throws IOException { - - this.shutdownRequested = true; - try { - this.serverSocket.close(); - } catch (IOException ioe) { + if (shutdownRequested.compareAndSet(false, true)) { + try { + this.serverSocket.close(); + } + catch (IOException ioe) { LOG.debug("Error while closing the server socket.", ioe); - } - try { - join(); - } catch (InterruptedException ie) { - LOG.debug("Error while waiting for this thread to die.", ie); - } + } + try { + join(); + } + catch (InterruptedException ie) { + LOG.debug("Error while waiting for this thread to die.", ie); + } + + // Clean up the storage directory + FileUtils.deleteDirectory(storageDir); - // Clean up the storage directory - FileUtils.deleteDirectory(storageDir); + // Remove shutdown hook to prevent resource leaks + Runtime.getRuntime().removeShutdownHook(shutdownHook); - // TODO: Find/implement strategy to handle content-addressable BLOBs + // TODO: Find/implement strategy to handle content-addressable BLOBs + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b88f909c/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 751137a..476f481 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -197,22 +197,27 @@ public class BlobUtils { } /** - * Adds a shutdown hook to the JVM to delete the given directory. + * Adds a shutdown hook to the JVM and returns the Thread, which has been registered. */ - static void addDeleteDirectoryShutdownHook(final File dir, final Logger errorLogger) { - checkNotNull(dir); + static Thread addShutdownHook(final BlobService service, final Logger logger) { + checkNotNull(service); + checkNotNull(logger); - // Add shutdown hook to delete directory - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + final Thread shutdownHook = new Thread(new Runnable() { @Override public void run() { try { - FileUtils.deleteDirectory(dir); + service.shutdown(); } catch (Throwable t) { - errorLogger.error("Error deleting directory " + dir + " during JVM shutdown: " + t.getMessage(), t); + logger.error("Error during shutdown of blob service via JVM shutdown hook: " + t.getMessage(), t); } } - })); + }); + + // Add JVM shutdown hook to call shutdown of service + Runtime.getRuntime().addShutdownHook(shutdownHook); + + return shutdownHook; } }
