Repository: flink
Updated Branches:
refs/heads/release-0.8 e01712111 -> 5b420d847
[FLINK-1492] Fix exceptions on blob store shutdown
Conflicts:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b420d84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b420d84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b420d84
Branch: refs/heads/release-0.8
Commit: 5b420d8477b2ed1dd72d3bbcade5fd0f72a6110b
Parents: e017121
Author: Ufuk Celebi <[email protected]>
Authored: Mon Feb 9 11:44:47 2015 +0100
Committer: Robert Metzger <[email protected]>
Committed: Tue Feb 10 14:40:35 2015 +0100
----------------------------------------------------------------------
.../apache/flink/runtime/blob/BlobCache.java | 16 ++++-
.../apache/flink/runtime/blob/BlobServer.java | 66 ++++++++++++--------
.../apache/flink/runtime/blob/BlobUtils.java | 30 ++++++++-
3 files changed, 84 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
index 0f57ea3..d0d9a45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* The BLOB cache implements a local cache for content-addressable BLOBs. When
requesting BLOBs through the
@@ -48,6 +49,10 @@ public final class BlobCache implements BlobService {
private final File storageDir;
+ private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+ /** Shutdown hook thread to ensure deletion of the storage directory. */
+ private final Thread shutdownHook;
public BlobCache(InetSocketAddress serverAddress) {
this.serverAddress = serverAddress;
@@ -55,6 +60,9 @@ public final class BlobCache implements BlobService {
this.storageDir = BlobUtils.initStorageDirectory();
LOG.info("Created BLOB cache storage directory " + storageDir);
+
+ // Add shutdown hook to delete storage directory
+ shutdownHook = BlobUtils.addShutdownHook(this, LOG);
}
/**
@@ -152,7 +160,11 @@ public final class BlobCache implements BlobService {
}
@Override
- public void shutdown() throws IOException{
- FileUtils.deleteDirectory(storageDir);
+ public void shutdown() throws IOException {
+ if (shutdownRequested.compareAndSet(false, true)) {
+ FileUtils.deleteDirectory(storageDir);
+
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 60e1716..068a859 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -26,6 +26,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
@@ -86,7 +87,10 @@ public final class BlobServer extends Thread implements
BlobService{
/**
* Indicates whether a shutdown of server component has been requested.
*/
- private volatile boolean shutdownRequested = false;
+ private AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+ /** Shutdown hook thread to ensure deletion of the storage directory. */
+ private final Thread shutdownHook;
/**
* Is the root directory for file storage
@@ -100,18 +104,25 @@ public final class BlobServer extends Thread implements
BlobService{
* thrown if the BLOB server cannot bind to a free network port
*/
public BlobServer() throws IOException {
+ try {
+ this.serverSocket = new ServerSocket(0);
- this.serverSocket = new ServerSocket(0);
- start();
+ start();
- if (LOG.isInfoEnabled()) {
- LOG.info(String.format("Started BLOB server on port %d",
- this.serverSocket.getLocalPort()));
- }
+ if (LOG.isInfoEnabled()) {
+ LOG.info(String.format("Started BLOB server on
port %d",
+
this.serverSocket.getLocalPort()));
+ }
+
+ this.storageDir = BlobUtils.initStorageDirectory();
- this.storageDir = BlobUtils.initStorageDirectory();
+ LOG.info("Created BLOB server storage directory " +
storageDir);
- LOG.info("Created BLOB server storage directory " + storageDir);
+ shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+ }
+ catch (IOException e) {
+ throw new IOException("Could not create BlobServer with
random port.", e);
+ }
}
/**
@@ -172,12 +183,12 @@ public final class BlobServer extends Thread implements
BlobService{
try {
- while (!this.shutdownRequested) {
+ while (!this.shutdownRequested.get()) {
new BlobConnection(this.serverSocket.accept(),
this).start();
}
} catch (IOException ioe) {
- if (!this.shutdownRequested && LOG.isErrorEnabled()) {
+ if (!this.shutdownRequested.get() &&
LOG.isErrorEnabled()) {
LOG.error("Blob server stopped working.", ioe);
}
}
@@ -188,23 +199,28 @@ public final class BlobServer extends Thread implements
BlobService{
*/
@Override
public void shutdown() throws IOException {
-
- this.shutdownRequested = true;
- try {
- this.serverSocket.close();
- } catch (IOException ioe) {
+ if (shutdownRequested.compareAndSet(false, true)) {
+ try {
+ this.serverSocket.close();
+ }
+ catch (IOException ioe) {
LOG.debug("Error while closing the server
socket.", ioe);
- }
- try {
- join();
- } catch (InterruptedException ie) {
- LOG.debug("Error while waiting for this thread to
die.", ie);
- }
+ }
+ try {
+ join();
+ }
+ catch (InterruptedException ie) {
+ LOG.debug("Error while waiting for this thread
to die.", ie);
+ }
- // Clean up the storage directory
- FileUtils.deleteDirectory(storageDir);
+ // Clean up the storage directory
+ FileUtils.deleteDirectory(storageDir);
- // TODO: Find/implement strategy to handle content-addressable
BLOBs
+ // Remove shutdown hook to prevent resource leaks
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+
+ // TODO: Find/implement strategy to handle
content-addressable BLOBs
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5b420d84/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index dec574d..476f481 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,6 +23,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobgraph.JobID;
+import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
@@ -31,7 +32,10 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
+import static com.google.common.base.Preconditions.checkNotNull;
+
public class BlobUtils {
+
/**
* Algorithm to be used for calculating the BLOB keys.
*/
@@ -52,7 +56,6 @@ public class BlobUtils {
*/
static final Charset DEFAULT_CHARSET = Charset.forName("utf-8");
-
/**
* Creates a storage directory for a blob service.
*
@@ -192,4 +195,29 @@ public class BlobUtils {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Adds a shutdown hook to the JVM and returns the Thread, which has
been registered.
+ */
+ static Thread addShutdownHook(final BlobService service, final Logger
logger) {
+ checkNotNull(service);
+ checkNotNull(logger);
+
+ final Thread shutdownHook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ service.shutdown();
+ }
+ catch (Throwable t) {
+ logger.error("Error during shutdown of
blob service via JVM shutdown hook: " + t.getMessage(), t);
+ }
+ }
+ });
+
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ return shutdownHook;
+ }
}