[FLINK-2805] [blobmanager] Write JARs to file state backend for recovery

Move StateBackend enum to top level and org.apache.flink.runtime.state

Abstract blob store in blob server for recovery

This closes #1227.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3a4d1d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3a4d1d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3a4d1d9

Branch: refs/heads/master
Commit: c3a4d1d9f720a1da9697d0bbf48f7a3b1f5851b8
Parents: c2989f2
Author: Ufuk Celebi <[email protected]>
Authored: Mon Oct 5 14:30:46 2015 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 20 00:16:52 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobServer.java   | 105 +++++++++--
 .../runtime/blob/BlobServerConnection.java      |  52 ++++--
 .../apache/flink/runtime/blob/BlobStore.java    |  97 ++++++++++
 .../apache/flink/runtime/blob/BlobUtils.java    |  75 +++++++-
 .../flink/runtime/blob/FileSystemBlobStore.java | 186 +++++++++++++++++++
 .../flink/runtime/blob/VoidBlobStore.java       |  61 ++++++
 .../flink/runtime/jobmanager/RecoveryMode.java  |  12 +-
 .../flink/runtime/blob/BlobRecoveryITCase.java  | 159 ++++++++++++++++
 .../BlobLibraryCacheRecoveryITCase.java         | 176 ++++++++++++++++++
 9 files changed, 874 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ef2ef61..d0bed8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -30,13 +38,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.common.JobID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * This class implements the BLOB server. The BLOB server is responsible for 
listening for incoming requests and
@@ -57,12 +59,12 @@ public class BlobServer extends Thread implements 
BlobService {
        /** Indicates whether a shutdown of server component has been 
requested. */
        private final AtomicBoolean shutdownRequested = new AtomicBoolean();
 
-       /** Shutdown hook thread to ensure deletion of the storage directory. */
-       private final Thread shutdownHook;
-
        /** Is the root directory for file storage */
        private final File storageDir;
 
+       /** Blob store for recovery */
+       private final BlobStore blobStore;
+
        /** Set of currently running threads */
        private final Set<BlobServerConnection> activeConnections = new 
HashSet<BlobServerConnection>();
 
@@ -70,18 +72,43 @@ public class BlobServer extends Thread implements 
BlobService {
        private final int maxConnections;
 
        /**
+        * Shutdown hook thread to ensure deletion of the storage directory (or 
<code>null</code> if
+        * the configured recovery mode does not equal{@link 
RecoveryMode#STANDALONE})
+        */
+       private final Thread shutdownHook;
+
+       /**
         * Instantiates a new BLOB server and binds it to a free network port.
-        * 
+        *
         * @throws IOException
         *         thrown if the BLOB server cannot bind to a free network port
         */
        public BlobServer(Configuration config) throws IOException {
+               checkNotNull(config, "Configuration");
+
+               RecoveryMode recoveryMode = RecoveryMode.fromConfig(config);
 
                // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
+               // No recovery.
+               if (recoveryMode == RecoveryMode.STANDALONE) {
+                       this.blobStore = new VoidBlobStore();
+               }
+               // Recovery. Check that everything has been setup correctly. 
This is not clean, but it's
+               // better to resolve this with some upcoming changes to the 
state backend setup.
+               else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
+                               
config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) {
+
+                       this.blobStore = new FileSystemBlobStore(config);
+               }
+               // Fallback.
+               else {
+                       this.blobStore = new VoidBlobStore();
+               }
+
                // configure the maximum number of concurrent connections
                final int maxConnections = config.getInteger(
                                ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
@@ -102,8 +129,13 @@ public class BlobServer extends Thread implements 
BlobService {
                        backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
                }
 
-               // Add shutdown hook to delete storage directory
-               this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+               if (recoveryMode == RecoveryMode.STANDALONE) {
+                       // Add shutdown hook to delete storage directory
+                       this.shutdownHook = BlobUtils.addShutdownHook(this, 
LOG);
+               }
+               else {
+                       this.shutdownHook = null;
+               }
 
                // start the server
                try {
@@ -132,37 +164,43 @@ public class BlobServer extends Thread implements 
BlobService {
         * Returns a file handle to the file associated with the given blob key 
on the blob
         * server.
         *
+        * <p><strong>This is only called from the {@link 
BlobServerConnection}</strong>
+        *
         * @param key identifying the file
         * @return file handle to the file
         */
-       public File getStorageLocation(BlobKey key) {
+       File getStorageLocation(BlobKey key) {
                return BlobUtils.getStorageLocation(storageDir, key);
        }
 
        /**
         * Returns a file handle to the file identified by the given jobID and 
key.
         *
+        * <p><strong>This is only called from the {@link 
BlobServerConnection}</strong>
+        *
         * @param jobID to which the file is associated
         * @param key to identify the file within the job context
         * @return file handle to the file
         */
-       public File getStorageLocation(JobID jobID, String key) {
+       File getStorageLocation(JobID jobID, String key) {
                return BlobUtils.getStorageLocation(storageDir, jobID, key);
        }
 
        /**
         * Method which deletes all files associated with the given jobID.
         *
+        * <p><strong>This is only called from the {@link 
BlobServerConnection}</strong>
+        *
         * @param jobID all files associated to this jobID will be deleted
         * @throws IOException
         */
-       public void deleteJobDirectory(JobID jobID) throws IOException {
+       void deleteJobDirectory(JobID jobID) throws IOException {
                BlobUtils.deleteJobDirectory(storageDir, jobID);
        }
 
        /**
         * Returns a temporary file inside the BLOB server's incoming directory.
-        * 
+        *
         * @return a temporary file inside the BLOB server's incoming directory
         */
        File createTemporaryFilename() {
@@ -170,6 +208,13 @@ public class BlobServer extends Thread implements 
BlobService {
                                String.format("temp-%08d", 
tempFileCounter.getAndIncrement()));
        }
 
+       /**
+        * Returns the blob store.
+        */
+       BlobStore getBlobStore() {
+               return blobStore;
+       }
+
        @Override
        public void run() {
                try {
@@ -245,6 +290,9 @@ public class BlobServer extends Thread implements 
BlobService {
                                LOG.error("BLOB server failed to properly clean 
up its storage directory.");
                        }
 
+                       // Clean up the recovery directory
+                       blobStore.cleanUp();
+
                        // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the
                        // shutdown hook itself
                        if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
@@ -282,11 +330,26 @@ public class BlobServer extends Thread implements 
BlobService {
 
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
requiredBlob);
 
-               if (!localFile.exists()) {
-                       throw new FileNotFoundException("File " + 
localFile.getCanonicalPath() + " does not exist.");
-               } else {
+               if (localFile.exists()) {
                        return localFile.toURI().toURL();
                }
+               else {
+                       try {
+                               // Try the blob store
+                               blobStore.get(requiredBlob, localFile);
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Failed to copy from blob 
store.", e);
+                       }
+
+                       if (localFile.exists()) {
+                               return localFile.toURI().toURL();
+                       }
+                       else {
+                               throw new FileNotFoundException("Local file " + 
localFile + " does not exist " +
+                                               "and failed to copy from blob 
store.");
+                       }
+               }
        }
 
        /**
@@ -305,6 +368,8 @@ public class BlobServer extends Thread implements 
BlobService {
                                LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
                        }
                }
+
+               blobStore.delete(key);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 793a9d6..d7bba8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.blob;
 
+import com.google.common.io.Files;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
@@ -29,28 +35,21 @@ import java.net.Socket;
 import java.net.SocketException;
 import java.security.MessageDigest;
 
-import com.google.common.io.Files;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.util.InstantiationUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static 
org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
 import static 
org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
 import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
 
-import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
-import static 
org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
-
 /**
  * A BLOB connection handles a series of requests from a particular BLOB 
client.
  */
@@ -181,10 +180,18 @@ class BlobServerConnection extends Thread {
                                JobID jobID = JobID.fromByteArray(jidBytes);
                                String key = readKey(buf, inputStream);
                                blobFile = 
this.blobServer.getStorageLocation(jobID, key);
+
+                               if (!blobFile.exists()) {
+                                       blobServer.getBlobStore().get(jobID, 
key, blobFile);
+                               }
                        }
                        else if (contentAddressable == CONTENT_ADDRESSABLE) {
                                final BlobKey key = 
BlobKey.readFromInputStream(inputStream);
                                blobFile = blobServer.getStorageLocation(key);
+
+                               if (!blobFile.exists()) {
+                                       blobServer.getBlobStore().get(key, 
blobFile);
+                               }
                        }
                        else {
                                throw new IOException("Unknown type of BLOB 
addressing.");
@@ -194,6 +201,7 @@ class BlobServerConnection extends Thread {
                        if (!blobFile.exists()) {
                                throw new IOException("Cannot find required 
BLOB at " + blobFile.getAbsolutePath());
                        }
+
                        if (blobFile.length() > Integer.MAX_VALUE) {
                                throw new IOException("BLOB size exceeds the 
maximum size (2 GB).");
                        }
@@ -220,8 +228,7 @@ class BlobServerConnection extends Thread {
                        int blobLen = (int) blobFile.length();
                        writeLength(blobLen, outputStream);
 
-                       FileInputStream fis = new FileInputStream(blobFile);
-                       try {
+                       try (FileInputStream fis = new 
FileInputStream(blobFile)) {
                                int bytesRemaining = blobLen;
                                while (bytesRemaining > 0) {
                                        int read = fis.read(buf);
@@ -231,8 +238,6 @@ class BlobServerConnection extends Thread {
                                        outputStream.write(buf, 0, read);
                                        bytesRemaining -= read;
                                }
-                       } finally {
-                               fis.close();
                        }
                }
                catch (SocketException e) {
@@ -314,6 +319,9 @@ class BlobServerConnection extends Thread {
                                File storageFile = 
this.blobServer.getStorageLocation(jobID, key);
                                Files.move(incomingFile, storageFile);
                                incomingFile = null;
+
+                               blobServer.getBlobStore().put(storageFile, 
jobID, key);
+
                                outputStream.write(RETURN_OKAY);
                        }
                        else {
@@ -322,6 +330,8 @@ class BlobServerConnection extends Thread {
                                Files.move(incomingFile, storageFile);
                                incomingFile = null;
 
+                               blobServer.getBlobStore().put(storageFile, 
blobKey);
+
                                // Return computed key to client for validation
                                outputStream.write(RETURN_OKAY);
                                blobKey.writeToOutputStream(outputStream);
@@ -379,6 +389,8 @@ class BlobServerConnection extends Thread {
                                if (blobFile.exists() && !blobFile.delete()) {
                                        throw new IOException("Cannot delete 
BLOB file " + blobFile.getAbsolutePath());
                                }
+
+                               blobServer.getBlobStore().delete(key);
                        }
                        else if (type == NAME_ADDRESSABLE) {
                                byte[] jidBytes = new byte[JobID.SIZE];
@@ -391,6 +403,8 @@ class BlobServerConnection extends Thread {
                                if (blobFile.exists() && !blobFile.delete()) {
                                        throw new IOException("Cannot delete 
BLOB file " + blobFile.getAbsolutePath());
                                }
+
+                               blobServer.getBlobStore().delete(jobID, key);
                        }
                        else if (type == JOB_ID_SCOPE) {
                                byte[] jidBytes = new byte[JobID.SIZE];
@@ -398,6 +412,8 @@ class BlobServerConnection extends Thread {
                                JobID jobID = JobID.fromByteArray(jidBytes);
 
                                blobServer.deleteJobDirectory(jobID);
+
+                               blobServer.getBlobStore().deleteAll(jobID);
                        }
                        else {
                                throw new IOException("Unrecognized addressing 
type: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
new file mode 100644
index 0000000..1e72d91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+
+/**
+ * A blob store.
+ */
+interface BlobStore {
+
+       /**
+        * Copies the local file to the blob store.
+        *
+        * @param localFile The file to copy
+        * @param blobKey   The ID for the file in the blob store
+        * @throws Exception If the copy fails
+        */
+       void put(File localFile, BlobKey blobKey) throws Exception;
+
+       /**
+        * Copies a local file to the blob store.
+        *
+        * <p>The job ID and key make up a composite key for the file.
+        *
+        * @param localFile The file to copy
+        * @param jobId     The JobID part of ID for the file in the blob store
+        * @param key       The String part of ID for the file in the blob store
+        * @throws Exception If the copy fails
+        */
+       void put(File localFile, JobID jobId, String key) throws Exception;
+
+       /**
+        * Copies a blob to a local file.
+        *
+        * @param blobKey   The blob ID
+        * @param localFile The local file to copy to
+        * @throws Exception If the copy fails
+        */
+       void get(BlobKey blobKey, File localFile) throws Exception;
+
+       /**
+        * Copies a blob to a local file.
+        *
+        * @param jobId     The JobID part of ID for the blob
+        * @param key       The String part of ID for the blob
+        * @param localFile The local file to copy to
+        * @throws Exception If the copy fails
+        */
+       void get(JobID jobId, String key, File localFile) throws Exception;
+
+       /**
+        * Deletes a blob.
+        *
+        * @param blobKey The blob ID
+        */
+       void delete(BlobKey blobKey);
+
+       /**
+        * Deletes a blob.
+        *
+        * @param jobId The JobID part of ID for the blob
+        * @param key   The String part of ID for the blob
+        */
+       void delete(JobID jobId, String key);
+
+       /**
+        * Deletes blobs.
+        *
+        * @param jobId The JobID part of all blobs to delete
+        */
+       void deleteAll(JobID jobId);
+
+       /**
+        * Cleans up the store and deletes all blobs.
+        */
+       void cleanUp();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index c47ecf2..d8f744b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -21,14 +21,19 @@ package org.apache.flink.runtime.blob;
 import com.google.common.io.BaseEncoding;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.util.IOUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.net.URI;
 import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -49,12 +54,12 @@ public class BlobUtils {
        /**
         * The prefix of all BLOB files stored by the BLOB server.
         */
-       private static final String BLOB_FILE_PREFIX = "blob_";
+       static final String BLOB_FILE_PREFIX = "blob_";
 
        /**
         * The prefix of all job-specific directories created by the BLOB 
server.
         */
-       private static final String JOB_DIR_PREFIX = "job_";
+       static final String JOB_DIR_PREFIX = "job_";
 
        /**
         * The default character set to translate between characters and bytes.
@@ -103,7 +108,7 @@ public class BlobUtils {
        static File getIncomingDirectory(File storageDir) {
                final File incomingDir = new File(storageDir, "incoming");
 
-               if (!incomingDir.exists() && !incomingDir.mkdir()) {
+               if (!incomingDir.exists() && !incomingDir.mkdirs()) {
                        throw new RuntimeException("Cannot create directory for 
incoming files " + incomingDir.getAbsolutePath());
                }
 
@@ -119,7 +124,7 @@ public class BlobUtils {
        private static File getCacheDirectory(File storageDir) {
                final File cacheDirectory = new File(storageDir, "cache");
 
-               if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) {
+               if (!cacheDirectory.exists() && !cacheDirectory.mkdirs()) {
                        throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory.getAbsolutePath() + "'.");
                }
 
@@ -174,7 +179,7 @@ public class BlobUtils {
         *        the user's key for a BLOB
         * @return the internal name for the BLOB as used by the BLOB server
         */
-       private static String encodeKey(String key) {
+       static String encodeKey(String key) {
                return 
BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
        }
 
@@ -327,6 +332,66 @@ public class BlobUtils {
        }
 
        /**
+        * Returns the path for the given blob key.
+        *
+        * <p>The returned path can be used with the state backend for recovery 
purposes.
+        *
+        * <p>This follows the same scheme as {@link #getStorageLocation(File, 
BlobKey)}.
+        */
+       static String getRecoveryPath(String basePath, BlobKey blobKey) {
+               // format: $base/cache/blob_$key
+               return String.format("%s/cache/%s", basePath, BLOB_FILE_PREFIX 
+ blobKey.toString());
+       }
+
+       /**
+        * Returns the path for the given job ID and key.
+        *
+        * <p>The returned path can be used with the state backend for recovery 
purposes.
+        *
+        * <p>This follows the same scheme as {@link #getStorageLocation(File, 
JobID, String)}.
+        */
+       static String getRecoveryPath(String basePath, JobID jobId, String key) 
{
+               // format: $base/job_$id/blob_$key
+               return String.format("%s/%s/%s", basePath, JOB_DIR_PREFIX + 
jobId.toString(),
+                               BLOB_FILE_PREFIX + encodeKey(key));
+       }
+
+       /**
+        * Returns the path for the given job ID.
+        *
+        * <p>The returned path can be used with the state backend for recovery 
purposes.
+        */
+       static String getRecoveryPath(String basePath, JobID jobId) {
+               return String.format("%s/%s", basePath, JOB_DIR_PREFIX + 
jobId.toString());
+       }
+
+       /**
+        * Copies the file from the recovery path to the local file.
+        */
+       static void copyFromRecoveryPath(String recoveryPath, File 
localBlobFile) throws Exception {
+               if (recoveryPath == null) {
+                       throw new IllegalStateException("Failed to determine 
recovery path.");
+               }
+
+               if (!localBlobFile.createNewFile()) {
+                       throw new IllegalStateException("Failed to create new 
local file to copy to");
+               }
+
+               URI uri = new URI(recoveryPath);
+               Path path = new Path(recoveryPath);
+
+               if (FileSystem.get(uri).exists(path)) {
+                       try (InputStream is = FileSystem.get(uri).open(path)) {
+                               FileOutputStream fos = new 
FileOutputStream(localBlobFile);
+                               IOUtils.copyBytes(is, fos); // closes the 
streams
+                       }
+               }
+               else {
+                       throw new IOException("Cannot find required BLOB at '" 
+ recoveryPath + "' for recovery.");
+               }
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private BlobUtils() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
new file mode 100644
index 0000000..8a037ad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import com.google.common.io.Files;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Blob store backed by {@link FileSystem}.
+ */
+class FileSystemBlobStore implements BlobStore {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBlobStore.class);
+
+       /** The base path of the blob store */
+       private final String basePath;
+
+       FileSystemBlobStore(Configuration config) throws IOException {
+               StateBackend stateBackend = StateBackend.fromConfig(config);
+
+               if (stateBackend == StateBackend.FILESYSTEM) {
+                       String stateBackendBasePath = config.getString(
+                                       
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+
+                       if (stateBackendBasePath.equals("")) {
+                               throw new 
IllegalConfigurationException(String.format("Missing configuration for " +
+                                               "file system state backend 
recovery path. Please specify via " +
+                                               "'%s' key.", 
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+                       }
+
+                       stateBackendBasePath += "/blob";
+
+                       this.basePath = stateBackendBasePath;
+
+                       try {
+                               FileSystem.get(new URI(basePath)).mkdirs(new 
Path(basePath));
+                       }
+                       catch (URISyntaxException e) {
+                               throw new IOException(e);
+                       }
+
+                       LOG.info("Created blob directory {}.", basePath);
+               }
+               else {
+                       // Nothing else support at the moment
+                       throw new IllegalConfigurationException(
+                                       String.format("Illegal state backend " +
+                                                                       
"configuration '%s'. Please configure '%s' as state " +
+                                                                       
"backend and specify the recovery path via '%s' key.",
+                                                       stateBackend, 
StateBackend.FILESYSTEM,
+                                                       
ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+               }
+       }
+
+       // - Put 
------------------------------------------------------------------
+
+       @Override
+       public void put(File localFile, BlobKey blobKey) throws Exception {
+               put(localFile, BlobUtils.getRecoveryPath(basePath, blobKey));
+       }
+
+       @Override
+       public void put(File localFile, JobID jobId, String key) throws 
Exception {
+               put(localFile, BlobUtils.getRecoveryPath(basePath, jobId, key));
+       }
+
+       private void put(File fromFile, String toBlobPath) throws Exception {
+               try (OutputStream os = FileSystem.get(new URI(toBlobPath))
+                               .create(new Path(toBlobPath), true)) {
+
+                       LOG.debug("Copying from {} to {}.", fromFile, 
toBlobPath);
+                       Files.copy(fromFile, os);
+               }
+       }
+
+       // - Get 
------------------------------------------------------------------
+
+       @Override
+       public void get(BlobKey blobKey, File localFile) throws Exception {
+               get(BlobUtils.getRecoveryPath(basePath, blobKey), localFile);
+       }
+
+       @Override
+       public void get(JobID jobId, String key, File localFile) throws 
Exception {
+               get(BlobUtils.getRecoveryPath(basePath, jobId, key), localFile);
+       }
+
+       private void get(String fromBlobPath, File toFile) throws Exception {
+               checkNotNull(fromBlobPath, "Blob path");
+               checkNotNull(toFile, "File");
+
+               if (!toFile.exists() && !toFile.createNewFile()) {
+                       throw new IllegalStateException("Failed to create 
target file to copy to");
+               }
+
+               final URI fromUri = new URI(fromBlobPath);
+               final Path fromPath = new Path(fromBlobPath);
+
+               if (FileSystem.get(fromUri).exists(fromPath)) {
+                       try (InputStream is = 
FileSystem.get(fromUri).open(fromPath)) {
+                               FileOutputStream fos = new 
FileOutputStream(toFile);
+
+                               LOG.debug("Copying from {} to {}.", 
fromBlobPath, toFile);
+                               IOUtils.copyBytes(is, fos); // closes the 
streams
+                       }
+               }
+               else {
+                       throw new IOException(fromBlobPath + " does not 
exist.");
+               }
+       }
+
+       // - Delete 
---------------------------------------------------------------
+
+       @Override
+       public void delete(BlobKey blobKey) {
+               delete(BlobUtils.getRecoveryPath(basePath, blobKey));
+       }
+
+       @Override
+       public void delete(JobID jobId, String key) {
+               delete(BlobUtils.getRecoveryPath(basePath, jobId, key));
+       }
+
+       @Override
+       public void deleteAll(JobID jobId) {
+               delete(BlobUtils.getRecoveryPath(basePath, jobId));
+       }
+
+       private void delete(String blobPath) {
+               try {
+                       LOG.debug("Deleting {}.", blobPath);
+
+                       FileSystem.get(new URI(blobPath)).delete(new 
Path(blobPath), true);
+               }
+               catch (Exception e) {
+                       LOG.warn("Failed to delete blob at " + blobPath);
+               }
+       }
+
+       @Override
+       public void cleanUp() {
+               try {
+                       LOG.debug("Cleaning up {}.", basePath);
+
+                       FileSystem.get(new URI(basePath)).delete(new 
Path(basePath), true);
+               }
+               catch (Exception e) {
+                       LOG.error("Failed to clean up recovery directory.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
new file mode 100644
index 0000000..1b71add
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.File;
+
+/**
+ * A blob store doing nothing.
+ */
+class VoidBlobStore implements BlobStore {
+
+       @Override
+       public void put(File localFile, BlobKey blobKey) throws Exception {
+       }
+
+       @Override
+       public void put(File localFile, JobID jobId, String key) throws 
Exception {
+       }
+
+       @Override
+       public void get(BlobKey blobKey, File localFile) throws Exception {
+       }
+
+       @Override
+       public void get(JobID jobId, String key, File localFile) throws 
Exception {
+       }
+
+       @Override
+       public void delete(BlobKey blobKey) {
+       }
+
+       @Override
+       public void delete(JobID jobId, String key) {
+       }
+
+       @Override
+       public void deleteAll(JobID jobId) {
+       }
+
+       @Override
+       public void cleanUp() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
index 17322d8..077e34d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/RecoveryMode.java
@@ -24,11 +24,11 @@ import org.apache.flink.configuration.Configuration;
 /**
  * Recovery mode for Flink's cluster execution. Currently supported modes are:
  *
- *   - Standalone: No recovery from JobManager failures
- *   - ZooKeeper: JobManager high availability via ZooKeeper
- *     ZooKeeper is used to select a leader among a group of JobManager. This 
JobManager
- *     is responsible for the job execution. Upon failure of the leader a new 
leader is elected
- *     which will take over the responsibilities of the old leader
+ * - Standalone: No recovery from JobManager failures
+ * - ZooKeeper: JobManager high availability via ZooKeeper
+ * ZooKeeper is used to select a leader among a group of JobManager. This 
JobManager
+ * is responsible for the job execution. Upon failure of the leader a new 
leader is elected
+ * which will take over the responsibilities of the old leader
  */
 public enum RecoveryMode {
        STANDALONE,
@@ -69,4 +69,4 @@ public enum RecoveryMode {
                                return false;
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
new file mode 100644
index 0000000..0e324a8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobRecoveryITCase {
+
+       private File recoveryDir;
+
+       @Before
+       public void setUp() throws Exception {
+               recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
+               if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+                       throw new IllegalStateException("Failed to create temp 
directory for test");
+               }
+       }
+
+       @After
+       public void cleanUp() throws Exception {
+               if (recoveryDir != null) {
+                       FileUtils.deleteDirectory(recoveryDir);
+               }
+       }
+
+       /**
+        * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+        * participating BlobServer.
+        */
+       @Test
+       public void testBlobServerRecovery() throws Exception {
+               Random rand = new Random();
+
+               BlobServer[] server = new BlobServer[2];
+               InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+                       config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
+                       
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
recoveryDir.getPath());
+
+                       for (int i = 0; i < server.length; i++) {
+                               server[i] = new BlobServer(config);
+                               serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
+                       }
+
+                       client = new BlobClient(serverAddress[0]);
+
+                       // Random data
+                       byte[] expected = new byte[1024];
+                       rand.nextBytes(expected);
+
+                       BlobKey[] keys = new BlobKey[2];
+
+                       // Put data
+                       keys[0] = client.put(expected); // Request 1
+                       keys[1] = client.put(expected, 32, 256); // Request 2
+
+                       JobID[] jobId = new JobID[] { new JobID(), new JobID() 
};
+                       String[] testKey = new String[] { "test-key-1", 
"test-key-2" };
+
+                       client.put(jobId[0], testKey[0], expected); // Request 3
+                       client.put(jobId[1], testKey[1], expected, 32, 256); // 
Request 4
+
+                       // Close the client and connect to the other server
+                       client.close();
+                       client = new BlobClient(serverAddress[1]);
+
+                       // Verify request 1
+                       try (InputStream is = client.get(keys[0])) {
+                               byte[] actual = new byte[expected.length];
+
+                               BlobUtils.readFully(is, actual, 0, 
expected.length, null);
+
+                               for (int i = 0; i < expected.length; i++) {
+                                       assertEquals(expected[i], actual[i]);
+                               }
+                       }
+
+                       // Verify request 2
+                       try (InputStream is = client.get(keys[1])) {
+                               byte[] actual = new byte[256];
+                               BlobUtils.readFully(is, actual, 0, 256, null);
+
+                               for (int i = 32, j = 0; i < 256; i++, j++) {
+                                       assertEquals(expected[i], actual[j]);
+                               }
+                       }
+
+                       // Verify request 3
+                       try (InputStream is = client.get(jobId[0], testKey[0])) 
{
+                               byte[] actual = new byte[expected.length];
+                               BlobUtils.readFully(is, actual, 0, 
expected.length, null);
+
+                               for (int i = 0; i < expected.length; i++) {
+                                       assertEquals(expected[i], actual[i]);
+                               }
+                       }
+
+                       // Verify request 4
+                       try (InputStream is = client.get(jobId[1], testKey[1])) 
{
+                               byte[] actual = new byte[256];
+                               BlobUtils.readFully(is, actual, 0, 256, null);
+
+                               for (int i = 32, j = 0; i < 256; i++, j++) {
+                                       assertEquals(expected[i], actual[j]);
+                               }
+                       }
+               }
+               finally {
+                       for (BlobServer s : server) {
+                               if (s != null) {
+                                       s.shutdown();
+                               }
+                       }
+
+                       if (client != null) {
+                               client.close();
+                       }
+               }
+
+               // Verify everything is clean
+               File[] recoveryFiles = recoveryDir.listFiles();
+               assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c3a4d1d9/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
new file mode 100644
index 0000000..4df8afb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution.librarycache;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class BlobLibraryCacheRecoveryITCase {
+
+       private File recoveryDir;
+
+       @Before
+       public void setUp() throws Exception {
+               recoveryDir = new File(FileUtils.getTempDirectory(), 
"BlobRecoveryITCaseDir");
+               if (!recoveryDir.exists() && !recoveryDir.mkdirs()) {
+                       throw new IllegalStateException("Failed to create temp 
directory for test");
+               }
+       }
+
+       @After
+       public void cleanUp() throws Exception {
+               if (recoveryDir != null) {
+                       FileUtils.deleteDirectory(recoveryDir);
+               }
+       }
+
+       /**
+        * Tests that with {@link RecoveryMode#ZOOKEEPER} distributed JARs are 
recoverable from any
+        * participating BlobLibraryCacheManager.
+        */
+       @Test
+       public void testRecoveryRegisterAndDownload() throws Exception {
+               Random rand = new Random();
+
+               BlobServer[] server = new BlobServer[2];
+               InetSocketAddress[] serverAddress = new InetSocketAddress[2];
+               BlobLibraryCacheManager[] libServer = new 
BlobLibraryCacheManager[2];
+               BlobCache cache = null;
+               BlobLibraryCacheManager libCache = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setString(ConfigConstants.RECOVERY_MODE, 
"ZOOKEEPER");
+                       config.setString(ConfigConstants.STATE_BACKEND, 
"FILESYSTEM");
+                       
config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, 
recoveryDir.getPath());
+
+                       for (int i = 0; i < server.length; i++) {
+                               server[i] = new BlobServer(config);
+                               serverAddress[i] = new 
InetSocketAddress("localhost", server[i].getPort());
+                               libServer[i] = new 
BlobLibraryCacheManager(server[i], 3600 * 1000);
+                       }
+
+                       // Random data
+                       byte[] expected = new byte[1024];
+                       rand.nextBytes(expected);
+
+                       List<BlobKey> keys = new ArrayList<>(2);
+
+                       // Upload some data (libraries)
+                       try (BlobClient client = new 
BlobClient(serverAddress[0])) {
+                               keys.add(client.put(expected)); // Request 1
+                               keys.add(client.put(expected, 32, 256)); // 
Request 2
+                       }
+
+                       // The cache
+                       cache = new BlobCache(serverAddress[0], config);
+                       libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
+
+                       // Register uploaded libraries
+                       JobID jobId = new JobID();
+                       ExecutionAttemptID executionId = new 
ExecutionAttemptID();
+                       libServer[0].registerTask(jobId, executionId, keys, 
Collections.<URL>emptyList());
+
+                       // Verify key 1
+                       File f = libCache.getFile(keys.get(0));
+                       assertEquals(expected.length, f.length());
+
+                       try (FileInputStream fis = new FileInputStream(f)) {
+                               for (int i = 0; i < expected.length && 
fis.available() > 0; i++) {
+                                       assertEquals(expected[i], (byte) 
fis.read());
+                               }
+
+                               assertEquals(0, fis.available());
+                       }
+
+                       // Shutdown cache and start with other server
+                       cache.shutdown();
+                       libCache.shutdown();
+
+                       cache = new BlobCache(serverAddress[1], config);
+                       libCache = new BlobLibraryCacheManager(cache, 3600 * 
1000);
+
+                       // Verify key 1
+                       f = libCache.getFile(keys.get(0));
+                       assertEquals(expected.length, f.length());
+
+                       try (FileInputStream fis = new FileInputStream(f)) {
+                               for (int i = 0; i < expected.length && 
fis.available() > 0; i++) {
+                                       assertEquals(expected[i], (byte) 
fis.read());
+                               }
+
+                               assertEquals(0, fis.available());
+                       }
+
+                       // Verify key 2
+                       f = libCache.getFile(keys.get(1));
+                       assertEquals(256, f.length());
+
+                       try (FileInputStream fis = new FileInputStream(f)) {
+                               for (int i = 0; i < 256 && fis.available() > 0; 
i++) {
+                                       assertEquals(expected[32 + i], (byte) 
fis.read());
+                               }
+
+                               assertEquals(0, fis.available());
+                       }
+               }
+               finally {
+                       for (BlobServer s : server) {
+                               if (s != null) {
+                                       s.shutdown();
+                               }
+                       }
+
+                       if (cache != null) {
+                               cache.shutdown();
+                       }
+
+                       if (libCache != null) {
+                               libCache.shutdown();
+                       }
+               }
+
+               // Verify everything is clean
+               File[] recoveryFiles = recoveryDir.listFiles();
+               assertEquals("Unclean state backend: " + 
Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
+       }
+}

Reply via email to