http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
new file mode 100644
index 0000000..0946d98
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketException;
+import java.security.MessageDigest;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE;
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE;
+import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
+import static org.apache.flink.runtime.blob.BlobUtils.readFully;
+import static org.apache.flink.runtime.blob.BlobUtils.readLength;
+import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
+
+import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
+import static 
org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
+import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
+
+/**
+ * A BLOB connection handles a series of requests from a particular BLOB 
client.
+ */
+class BlobServerConnection extends Thread {
+
+       /** The log object used for debugging. */
+       private static final Logger LOG = 
LoggerFactory.getLogger(BlobServer.class);
+
+       /** The socket to communicate with the client. */
+       private final Socket clientSocket;
+
+       /** The BLOB server. */
+       private final BlobServer blobServer;
+
+       /**
+        * Creates a new BLOB connection for a client request
+        * 
+        * @param clientSocket The socket to read/write data.
+        * @param blobServer The BLOB server.
+        */
+       BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
+               super("BLOB connection for " + 
clientSocket.getRemoteSocketAddress().toString());
+               setDaemon(true);
+
+               if (blobServer == null) {
+                       throw new NullPointerException();
+               }
+
+               this.clientSocket = clientSocket;
+               this.blobServer = blobServer;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Connection / Thread methods
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Main connection work method. Accepts requests until the other side 
closes the connection.
+        */
+       @Override
+       public void run() {
+               try {
+                       final InputStream inputStream = 
this.clientSocket.getInputStream();
+                       final OutputStream outputStream = 
this.clientSocket.getOutputStream();
+                       final byte[] buffer = new byte[BUFFER_SIZE];
+
+                       while (true) {
+                               // Read the requested operation
+                               final int operation = inputStream.read();
+                               if (operation < 0) {
+                                       // done, no one is asking anything from 
us
+                                       return;
+                               }
+
+                               switch (operation) {
+                               case PUT_OPERATION:
+                                       put(inputStream, outputStream, buffer);
+                                       break;
+                               case GET_OPERATION:
+                                       get(inputStream, outputStream, buffer);
+                                       break;
+                               case DELETE_OPERATION:
+                                       delete(inputStream, outputStream, 
buffer);
+                                       break;
+                               default:
+                                       throw new IOException("Unknown 
operation " + operation);
+                               }
+                       }
+               }
+               catch (SocketException e) {
+                       // this happens when the remote site closes the 
connection
+                       LOG.debug("Socket connection closed", e);
+               }
+               catch (Throwable t) {
+                       LOG.error("Error while executing BLOB connection.", t);
+               }
+               finally {
+                       try {
+                               if (clientSocket != null) {
+                                       clientSocket.close();
+                               }
+                       } catch (Throwable t) {
+                               LOG.debug("Exception while closing BLOB server 
connection socket.", t);
+                       }
+
+                       blobServer.unregisterConnection(this);
+               }
+       }
+
+       /**
+        * Closes the connection socket and lets the thread exit.
+        */
+       public void close() {
+               closeSilently(clientSocket, LOG);
+               interrupt();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Actions
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Handles an incoming GET request from a BLOB client.
+        * 
+        * @param inputStream
+        *        the input stream to read incoming data from
+        * @param outputStream
+        *        the output stream to send data back to the client
+        * @param buf
+        *        an auxiliary buffer for data serialization/deserialization
+        * @throws IOException
+        *         thrown if an I/O error occurs while reading/writing data 
from/to the respective streams
+        */
+       private void get(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+
+               File blobFile;
+               try {
+                       final int contentAdressable = inputStream.read();
+
+                       if (contentAdressable < 0) {
+                               throw new EOFException("Premature end of GET 
request");
+                       }
+                       if (contentAdressable == NAME_ADDRESSABLE) {
+                               // Receive the job ID and key
+                               byte[] jidBytes = new byte[JobID.SIZE];
+                               readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
+
+                               JobID jobID = JobID.fromByteArray(jidBytes);
+                               String key = readKey(buf, inputStream);
+                               blobFile = 
this.blobServer.getStorageLocation(jobID, key);
+                       }
+                       else if (contentAdressable == CONTENT_ADDRESSABLE) {
+                               final BlobKey key = 
BlobKey.readFromInputStream(inputStream);
+                               blobFile = blobServer.getStorageLocation(key);
+                       }
+                       else {
+                               throw new IOException("Unknown type of BLOB 
addressing.");
+                       }
+
+                       // Check if BLOB exists
+                       if (!blobFile.exists()) {
+                               throw new IOException("Cannot find required 
BLOB at " + blobFile.getAbsolutePath());
+                       }
+                       if (blobFile.length() > Integer.MAX_VALUE) {
+                               throw new IOException("BLOB size exceeds the 
maximum size (2 GB).");
+                       }
+
+                       outputStream.write(RETURN_OKAY);
+
+                       // up to here, an error can give a good message
+               }
+               catch (Throwable t) {
+                       LOG.error("GET operation failed", t);
+                       try {
+                               writeErrorToStream(outputStream, t);
+                       }
+                       catch (IOException e) {
+                               // since we are in an exception case, it means 
not much that we could not send the error
+                               // ignore this
+                       }
+                       clientSocket.close();
+                       return;
+               }
+
+               // from here on, we started sending data, so all we can do is 
close the connection when something happens
+               try {
+                       int blobLen = (int) blobFile.length();
+                       writeLength(blobLen, outputStream);
+
+                       FileInputStream fis = new FileInputStream(blobFile);
+                       try {
+                               int bytesRemaining = blobLen;
+                               while (bytesRemaining > 0) {
+                                       int read = fis.read(buf);
+                                       if (read < 0) {
+                                               throw new 
IOException("Premature end of BLOB file stream for " + 
blobFile.getAbsolutePath());
+                                       }
+                                       outputStream.write(buf, 0, read);
+                                       bytesRemaining -= read;
+                               }
+                       } finally {
+                               fis.close();
+                       }
+               }
+               catch (SocketException e) {
+                       // happens when the other side disconnects
+                       LOG.debug("Socket connection closed", e);
+               }
+               catch (Throwable t) {
+                       LOG.error("GET operation failed", t);
+                       clientSocket.close();
+               }
+       }
+
+       /**
+        * Handles an incoming PUT request from a BLOB client.
+        * 
+        * @param inputStream The input stream to read incoming data from.
+        * @param outputStream The output stream to send data back to the 
client.
+        * @param buf An auxiliary buffer for data 
serialization/deserialization.
+        */
+       private void put(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+               JobID jobID = null;
+               String key = null;
+               MessageDigest md = null;
+
+               File incomingFile = null;
+               FileOutputStream fos = null;
+
+               try {
+                       final int contentAdressable = inputStream.read();
+                       if (contentAdressable < 0) {
+                               throw new EOFException("Premature end of PUT 
request");
+                       }
+
+                       if (contentAdressable == NAME_ADDRESSABLE) {
+                               // Receive the job ID and key
+                               byte[] jidBytes = new byte[JobID.SIZE];
+                               readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
+                               jobID = JobID.fromByteArray(jidBytes);
+                               key = readKey(buf, inputStream);
+                       }
+                       else if (contentAdressable == CONTENT_ADDRESSABLE) {
+                               md = BlobUtils.createMessageDigest();
+                       }
+                       else {
+                               throw new IOException("Unknown type of BLOB 
addressing.");
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               if (contentAdressable == NAME_ADDRESSABLE) {
+                                       LOG.debug(String.format("Received PUT 
request for BLOB under %s / \"%s\"", jobID, key));
+                               } else {
+                                       LOG.debug("Received PUT request for 
content addressable BLOB");
+                               }
+                       }
+
+                       incomingFile = blobServer.createTemporaryFilename();
+                       fos = new FileOutputStream(incomingFile);
+
+                       while (true) {
+                               final int bytesExpected = 
readLength(inputStream);
+                               if (bytesExpected == -1) {
+                                       // done
+                                       break;
+                               }
+                               if (bytesExpected > BUFFER_SIZE) {
+                                       throw new IOException("Unexpected 
number of incoming bytes: " + bytesExpected);
+                               }
+
+                               readFully(inputStream, buf, 0, bytesExpected, 
"buffer");
+                               fos.write(buf, 0, bytesExpected);
+
+                               if (md != null) {
+                                       md.update(buf, 0, bytesExpected);
+                               }
+                       }
+
+                       fos.close();
+                       fos = null;
+
+                       if (contentAdressable == NAME_ADDRESSABLE) {
+                               File storageFile = 
this.blobServer.getStorageLocation(jobID, key);
+                               if (!incomingFile.renameTo(storageFile)) {
+                                       throw new 
IOException(String.format("Cannot move staging file %s to BLOB file %s",
+                                                       
incomingFile.getAbsolutePath(), storageFile.getAbsolutePath()));
+                               }
+                               incomingFile = null;
+                               outputStream.write(RETURN_OKAY);
+                       }
+                       else {
+                               BlobKey blobKey = new BlobKey(md.digest());
+                               File storageFile = 
blobServer.getStorageLocation(blobKey);
+                               if (!incomingFile.renameTo(storageFile)) {
+                                       throw new 
IOException(String.format("Cannot move staging file %s to BLOB file %s",
+                                                       
incomingFile.getAbsolutePath(), storageFile.getAbsolutePath()));
+                               }
+                               incomingFile = null;
+
+                               // Return computed key to client for validation
+                               outputStream.write(RETURN_OKAY);
+                               blobKey.writeToOutputStream(outputStream);
+                       }
+               }
+               catch (SocketException e) {
+                       // happens when the other side disconnects
+                       LOG.debug("Socket connection closed", e);
+               }
+               catch (Throwable t) {
+                       LOG.error("PUT operation failed", t);
+                       try {
+                               writeErrorToStream(outputStream, t);
+                       }
+                       catch (IOException e) {
+                               // since we are in an exception case, it means 
not much that we could not send the error
+                               // ignore this
+                       }
+                       clientSocket.close();
+               }
+               finally {
+                       if (fos != null) {
+                               try {
+                                       fos.close();
+                               } catch (Throwable t) {
+                                       LOG.warn("Cannot close stream to BLOB 
staging file", t);
+                               }
+                       }
+                       if (incomingFile != null) {
+                               if (!incomingFile.delete()) {
+                                       LOG.warn("Cannot delete BLOB server 
staging file " + incomingFile.getAbsolutePath());
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Handles an incoming DELETE request from a BLOB client.
+        * 
+        * @param inputStream The input stream to read the request from.
+        * @param outputStream The output stream to write the response to.
+        * @throws java.io.IOException Thrown if an I/O error occurs while 
reading the request data from the input stream.
+        */
+       private void delete(InputStream inputStream, OutputStream outputStream, 
byte[] buf) throws IOException {
+
+               try {
+                       int type = inputStream.read();
+                       if (type < 0) {
+                               throw new EOFException("Premature end of DELETE 
request");
+                       }
+
+                       if (type == CONTENT_ADDRESSABLE) {
+                               BlobKey key = 
BlobKey.readFromInputStream(inputStream);
+                               File blobFile = 
this.blobServer.getStorageLocation(key);
+                               if (blobFile.exists() && !blobFile.delete()) {
+                                       throw new IOException("Cannot delete 
BLOB file " + blobFile.getAbsolutePath());
+                               }
+                       }
+                       else if (type == NAME_ADDRESSABLE) {
+                               byte[] jidBytes = new byte[JobID.SIZE];
+                               readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
+                               JobID jobID = JobID.fromByteArray(jidBytes);
+
+                               String key = readKey(buf, inputStream);
+
+                               File blobFile = 
this.blobServer.getStorageLocation(jobID, key);
+                               if (blobFile.exists() && !blobFile.delete()) {
+                                       throw new IOException("Cannot delete 
BLOB file " + blobFile.getAbsolutePath());
+                               }
+                       }
+                       else if (type == JOB_ID_SCOPE) {
+                               byte[] jidBytes = new byte[JobID.SIZE];
+                               readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
+                               JobID jobID = JobID.fromByteArray(jidBytes);
+
+                               blobServer.deleteJobDirectory(jobID);
+                       }
+                       else {
+                               throw new IOException("Unrecognized addressing 
type: " + type);
+                       }
+
+                       outputStream.write(RETURN_OKAY);
+               }
+               catch (Throwable t) {
+                       LOG.error("DELETE operation failed", t);
+                       try {
+                               writeErrorToStream(outputStream, t);
+                       }
+                       catch (IOException e) {
+                               // since we are in an exception case, it means 
not much that we could not send the error
+                               // ignore this
+                       }
+                       clientSocket.close();
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Reads the key of a BLOB from the given input stream.
+        * 
+        * @param buf
+        *        auxiliary buffer to data deserialization
+        * @param inputStream
+        *        the input stream to read the key from
+        * @return the key of a BLOB
+        * @throws IOException
+        *         thrown if an I/O error occurs while reading the key data 
from the input stream
+        */
+       private static String readKey(byte[] buf, InputStream inputStream) 
throws IOException {
+               final int keyLength = readLength(inputStream);
+               if (keyLength > MAX_KEY_LENGTH) {
+                       throw new IOException("Unexpected key length " + 
keyLength);
+               }
+
+               readFully(inputStream, buf, 0, keyLength, "BlobKey");
+               return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET);
+       }
+
+       /**
+        * Writes to the output stream the error return code, and the given 
exception in serialized form.
+        *
+        * @param out Thr output stream to write to.
+        * @param t The exception to send.
+        * @throws IOException Thrown, if the output stream could not be 
written to.
+        */
+       private static void writeErrorToStream(OutputStream out, Throwable t) 
throws IOException {
+               byte[] bytes = InstantiationUtil.serializeObject(t);
+               out.write(RETURN_ERROR);
+               writeLength(bytes.length, out);
+               out.write(bytes);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
new file mode 100644
index 0000000..6df7811
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerProtocol.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.blob;
+
+public class BlobServerProtocol {
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Constants used in the protocol of the BLOB store
+       // 
--------------------------------------------------------------------------------------------
+
+       /** The buffer size in bytes for network transfers. */
+       static final int BUFFER_SIZE = 65536; // 64 K
+
+       /** The maximum key length allowed for storing BLOBs. */
+       static final int MAX_KEY_LENGTH = 64;
+
+       /** Internal code to identify a PUT operation. */
+       static final byte PUT_OPERATION = 0;
+
+       /** Internal code to identify a GET operation. */
+       static final byte GET_OPERATION = 1;
+
+       /** Internal code to identify a DELETE operation. */
+       static final byte DELETE_OPERATION = 2;
+
+       /** Internal code to identify a successful operation. */
+       static final byte RETURN_OKAY = 0;
+
+       /** Internal code to identify an erroneous operation. */
+       static final byte RETURN_ERROR = 1;
+
+       /** Internal code to identify a reference via content hash as the key */
+       static final byte CONTENT_ADDRESSABLE = 0;
+
+       /** Internal code to identify a reference via jobId and name as the key 
*/
+       static final byte NAME_ADDRESSABLE = 1;
+
+       /** Internal code to identify a reference via jobId as the key */
+       static final byte JOB_ID_SCOPE = 2;
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private BlobServerProtocol() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
index 148476f..a2400b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobService.java
@@ -21,27 +21,32 @@ package org.apache.flink.runtime.blob;
 import java.io.IOException;
 import java.net.URL;
 
+/**
+ * A simple store and retrieve binary large objects (BLOBs).
+ */
 public interface BlobService {
+
        /**
         * This method returns the URL of the file associated with the provided 
blob key.
         *
-        * @param requiredBlob blob key associated with the requested file
-        * @return URL of the file
+        * @param key blob key associated with the requested file
+        * @return The URL to the file.
         * @throws IOException
         */
-       URL getURL(final BlobKey requiredBlob) throws IOException;
+       URL getURL(BlobKey key) throws IOException;
+
 
        /**
         * This method deletes the file associated with the provided blob key.
         *
-        * @param blobKey associated with the file to be deleted
+        * @param key associated with the file to be deleted
         * @throws IOException
         */
-       void delete(final BlobKey blobKey) throws IOException;
+       void delete(BlobKey key) throws IOException;
 
        /**
-        * Returns the port of the blob service
-        * @return the port of the blob service
+        * Returns the port of the blob service.
+        * @return the port of the blob service.
         */
        int getPort();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 53cab1c..5db5ef6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -23,8 +23,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.slf4j.Logger;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
 import java.nio.charset.Charset;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -90,10 +94,13 @@ public class BlobUtils {
         * @return the BLOB server's directory for incoming files
         */
        static File getIncomingDirectory(File storageDir) {
-               final File incomingDirectory = new File(storageDir, "incoming");
-               incomingDirectory.mkdir();
+               final File incomingDir = new File(storageDir, "incoming");
 
-               return incomingDirectory;
+               if (!incomingDir.exists() && !incomingDir.mkdir()) {
+                       throw new RuntimeException("Cannot create directory for 
incoming files " + incomingDir.getAbsolutePath());
+               }
+
+               return incomingDir;
        }
 
        /**
@@ -106,7 +113,7 @@ public class BlobUtils {
                final File cacheDirectory = new File(storageDir, "cache");
 
                if (!cacheDirectory.exists() && !cacheDirectory.mkdir()) {
-                       throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory + "'.");
+                       throw new RuntimeException("Could not create cache 
directory '" + cacheDirectory.getAbsolutePath() + "'.");
                }
 
                return cacheDirectory;
@@ -119,7 +126,7 @@ public class BlobUtils {
         *        the key identifying the BLOB
         * @return the (designated) physical storage location of the BLOB
         */
-       static File getStorageLocation(final File storageDir,  final BlobKey 
key) {
+       static File getStorageLocation(File storageDir, BlobKey key) {
                return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX 
+ key.toString());
        }
 
@@ -132,7 +139,7 @@ public class BlobUtils {
         *        the key of the BLOB
         * @return the (designated) physical storage location of the BLOB with 
the given job ID and key
         */
-       static File getStorageLocation(final File storageDir, final JobID 
jobID, final String key) {
+       static File getStorageLocation(File storageDir, JobID jobID, String 
key) {
                return new File(getJobDirectory(storageDir, jobID), 
BLOB_FILE_PREFIX + encodeKey(key));
        }
 
@@ -143,9 +150,12 @@ public class BlobUtils {
         *        the ID of the job to return the storage directory for
         * @return the storage directory for BLOBs belonging to the job with 
the given ID
         */
-       private static File getJobDirectory(final File storageDir, final JobID 
jobID){
+       private static File getJobDirectory(File storageDir, JobID jobID) {
                final File jobDirectory = new File(storageDir, JOB_DIR_PREFIX + 
jobID.toString());
-               jobDirectory.mkdirs();
+
+               if (!jobDirectory.exists() && !jobDirectory.mkdirs()) {
+                       throw new RuntimeException("Could not create jobId 
directory '" + jobDirectory.getAbsolutePath() + "'.");
+               }
 
                return jobDirectory;
        }
@@ -157,8 +167,7 @@ public class BlobUtils {
         *        the user's key for a BLOB
         * @return the internal name for the BLOB as used by the BLOB server
         */
-       private static String encodeKey(final String key) {
-
+       private static String encodeKey(String key) {
                return 
BaseEncoding.base64().encode(key.getBytes(DEFAULT_CHARSET));
        }
 
@@ -168,9 +177,8 @@ public class BlobUtils {
         * @param jobID
         *                      jobID whose directory shall be deleted
         */
-       static void deleteJobDirectory(final File storageDir, final JobID 
jobID) throws IOException {
+       static void deleteJobDirectory(File storageDir, JobID jobID) throws 
IOException {
                File directory = getJobDirectory(storageDir, jobID);
-
                FileUtils.deleteDirectory(directory);
        }
 
@@ -220,4 +228,94 @@ public class BlobUtils {
                        return null;
                }
        }
+
+       /**
+        * Auxiliary method to write the length of an upcoming data chunk to an
+        * output stream.
+        *
+        * @param length
+        *        the length of the upcoming data chunk in bytes
+        * @param outputStream
+        *        the output stream to write the length to
+        * @throws IOException
+        *         thrown if an I/O error occurs while writing to the output
+        *         stream
+        */
+       static void writeLength(int length, OutputStream outputStream) throws 
IOException {
+               byte[] buf = new byte[4];
+               buf[0] = (byte) (length & 0xff);
+               buf[1] = (byte) ((length >> 8) & 0xff);
+               buf[2] = (byte) ((length >> 16) & 0xff);
+               buf[3] = (byte) ((length >> 24) & 0xff);
+               outputStream.write(buf, 0, 4);
+       }
+
+       /**
+        * Auxiliary method to read the length of an upcoming data chunk from an
+        * input stream.
+        *
+        * @param inputStream
+        *        the input stream to read the length from
+        * @return the length of the upcoming data chunk in bytes
+        * @throws IOException
+        *         thrown if an I/O error occurs while reading from the input
+        *         stream
+        */
+       static int readLength(InputStream inputStream) throws IOException {
+               byte[] buf = new byte[4];
+               int bytesRead = 0;
+               while (bytesRead < 4) {
+                       final int read = inputStream.read(buf, bytesRead, 4 - 
bytesRead);
+                       if (read < 0) {
+                               throw new EOFException("Read an incomplete 
length");
+                       }
+                       bytesRead += read;
+               }
+
+               bytesRead = buf[0] & 0xff;
+               bytesRead |= (buf[1] & 0xff) << 8;
+               bytesRead |= (buf[2] & 0xff) << 16;
+               bytesRead |= (buf[3] & 0xff) << 24;
+
+               return bytesRead;
+       }
+
+       /**
+        * Auxiliary method to read a particular number of bytes from an input 
stream. This method blocks until the
+        * requested number of bytes have been read from the stream. If the 
stream cannot offer enough data, an
+        * {@link EOFException} is thrown.
+        *
+        * @param inputStream The input stream to read the data from.
+        * @param buf The buffer to store the read data.
+        * @param off The offset inside the buffer.
+        * @param len The number of bytes to read from the stream.
+        * @param type The name of the type, to throw a good error message in 
case of not enough data.
+        * @throws IOException
+        *         Thrown if I/O error occurs while reading from the stream or 
the stream cannot offer enough data.
+        */
+       static void readFully(InputStream inputStream, byte[] buf, int off, int 
len, String type) throws IOException {
+
+               int bytesRead = 0;
+               while (bytesRead < len) {
+
+                       final int read = inputStream.read(buf, off + bytesRead, 
len
+                                       - bytesRead);
+                       if (read < 0) {
+                               throw new EOFException("Received an incomplete 
" + type);
+                       }
+                       bytesRead += read;
+               }
+       }
+
+       static void closeSilently(Socket socket, Logger LOG) {
+               if (socket != null) {
+                       try {
+                               socket.close();
+                       } catch (Throwable t) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Error while closing resource 
after BLOB transfer.", t);
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
index 7f7575b..ba9add5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/AbstractIDTest.java
@@ -22,10 +22,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 
+import java.nio.ByteBuffer;
+
 /**
  * This class contains tests for the {@link 
org.apache.flink.runtime.AbstractID} class.
  */
@@ -48,6 +51,45 @@ public class AbstractIDTest {
                        fail(e.getMessage());
                }
        }
+
+       @Test
+       public void testConvertToBytes() {
+               try {
+                       AbstractID origID = new AbstractID();
+
+                       AbstractID copy1 = new AbstractID(origID);
+                       AbstractID copy2 = new AbstractID(origID.getBytes());
+                       AbstractID copy3 = new 
AbstractID(origID.getLowerPart(), origID.getUpperPart());
+
+                       assertEquals(origID, copy1);
+                       assertEquals(origID, copy2);
+                       assertEquals(origID, copy3);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testConvertToByteBuffer() {
+               try {
+                       JobID origID = new JobID();
+
+                       byte[] bytes = origID.getBytes();
+                       ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+                       JobID copy1 = JobID.fromByteBuffer(buffer);
+                       JobID copy2 = JobID.fromByteArray(bytes);
+
+                       assertEquals(origID, copy1);
+                       assertEquals(origID, copy2);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
        
        @Test
        public void testCompare() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
new file mode 100644
index 0000000..aba0aff
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRetriesTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.URL;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for the blob cache retrying the connection to the server.
+ */
+public class BlobCacheRetriesTest {
+
+       /**
+        * A test where the connection fails twice and then the get operation 
succeeds.
+        */
+       @Test
+       public void testBlobFetchRetries() {
+
+               final byte[] data = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+
+               BlobServer server = null;
+               BlobCache cache = null;
+               try {
+                       final Configuration config = new Configuration();
+
+                       server = new TestingFailingBlobServer(config, 2);
+
+                       final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+
+                       // upload some blob
+                       BlobClient blobClient = null;
+                       BlobKey key;
+                       try {
+                               blobClient = new BlobClient(serverAddress);
+
+                               key = blobClient.put(data);
+                       }
+                       finally {
+                               if (blobClient != null) {
+                                       blobClient.close();
+                               }
+                       }
+
+                       cache = new BlobCache(serverAddress, config);
+
+                       // trigger a download - it should fail on the first 
time, but retry, and succeed at the second time
+                       URL url = cache.getURL(key);
+                       InputStream is = url.openStream();
+                       try {
+                               byte[] received = new byte[data.length];
+                               assertEquals(data.length, is.read(received));
+                               assertArrayEquals(data, received);
+                       }
+                       finally {
+                               is.close();
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (cache != null) {
+                               cache.shutdown();
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       /**
+        * A test where the connection fails too often and eventually fails the 
GET request.
+        */
+       @Test
+       public void testBlobFetchWithTooManyFailures() {
+
+               final byte[] data = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
+
+               BlobServer server = null;
+               BlobCache cache = null;
+               try {
+                       final Configuration config = new Configuration();
+
+                       server = new TestingFailingBlobServer(config, 10);
+
+                       final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+
+                       // upload some blob
+                       BlobClient blobClient = null;
+                       BlobKey key;
+                       try {
+                               blobClient = new BlobClient(serverAddress);
+
+                               key = blobClient.put(data);
+                       }
+                       finally {
+                               if (blobClient != null) {
+                                       blobClient.close();
+                               }
+                       }
+
+                       cache = new BlobCache(serverAddress, config);
+
+                       // trigger a download - it should fail eventually
+                       try {
+                               cache.getURL(key);
+                               fail("This should fail");
+                       }
+                       catch (IOException e) {
+                               // as we expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (cache != null) {
+                               cache.shutdown();
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
new file mode 100644
index 0000000..4b92b71
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheSuccessTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link BlobCache}.
+ */
+public class BlobCacheSuccessTest {
+
+       @Test
+       public void testBlobCache() {
+
+               // First create two BLOBs and upload them to BLOB server
+               final byte[] buf = new byte[128];
+               final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
+
+               BlobServer blobServer = null;
+               BlobCache blobCache = null;
+               try {
+
+                       // Start the BLOB server
+                       blobServer = new BlobServer(new Configuration());
+                       final InetSocketAddress serverAddress = new 
InetSocketAddress(blobServer.getPort());
+
+                       // Upload BLOBs
+                       BlobClient blobClient = null;
+                       try {
+
+                               blobClient = new BlobClient(serverAddress);
+
+                               blobKeys.add(blobClient.put(buf));
+                               buf[0] = 1; // Make sure the BLOB key changes
+                               blobKeys.add(blobClient.put(buf));
+                       } finally {
+                               if (blobClient != null) {
+                                       blobClient.close();
+                               }
+                       }
+
+                       blobCache = new BlobCache(serverAddress, new 
Configuration());
+
+                       for(int i = 0; i < blobKeys.size(); i++){
+                               blobCache.getURL(blobKeys.get(i));
+                       }
+
+                       // Now, shut down the BLOB server, the BLOBs must still 
be accessible through the cache.
+                       blobServer.shutdown();
+                       blobServer = null;
+
+                       final URL[] urls = new URL[blobKeys.size()];
+
+                       for(int i = 0; i < blobKeys.size(); i++){
+                               urls[i] = blobCache.getURL(blobKeys.get(i));
+                       }
+
+                       // Verify the result
+                       assertEquals(blobKeys.size(), urls.length);
+
+                       for (int i = 0; i < urls.length; ++i) {
+
+                               final URL url = urls[i];
+
+                               assertNotNull(url);
+
+                               try {
+                                       final File cachedFile = new 
File(url.toURI());
+
+                                       assertTrue(cachedFile.exists());
+                                       assertEquals(buf.length, 
cachedFile.length());
+
+                               } catch (URISyntaxException e) {
+                                       fail(e.getMessage());
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (blobServer != null) {
+                               blobServer.shutdown();
+                       }
+
+                       if(blobCache != null){
+                               blobCache.shutdown();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
deleted file mode 100644
index 32c8c3a..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.blob;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.configuration.Configuration;
-import org.junit.Test;
-
-/**
- * This class contains unit tests for the {@link BlobCache}.
- */
-public class BlobCacheTest {
-
-       @Test
-       public void testBlobCache() {
-
-               // First create two BLOBs and upload them to BLOB server
-               final byte[] buf = new byte[128];
-               final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
-
-               BlobServer blobServer = null;
-               BlobCache blobCache = null;
-               try {
-
-                       // Start the BLOB server
-                       blobServer = new BlobServer(new Configuration());
-                       final InetSocketAddress serverAddress = new 
InetSocketAddress(blobServer.getServerPort());
-
-                       // Upload BLOBs
-                       BlobClient blobClient = null;
-                       try {
-
-                               blobClient = new BlobClient(serverAddress);
-
-                               blobKeys.add(blobClient.put(buf));
-                               buf[0] = 1; // Make sure the BLOB key changes
-                               blobKeys.add(blobClient.put(buf));
-                       } finally {
-                               if (blobClient != null) {
-                                       blobClient.close();
-                               }
-                       }
-
-                       blobCache = new BlobCache(serverAddress, new 
Configuration());
-
-                       for(int i = 0; i < blobKeys.size(); i++){
-                               blobCache.getURL(blobKeys.get(i));
-                       }
-
-                       // Now, shut down the BLOB server, the BLOBs must still 
be accessible through the cache.
-                       blobServer.shutdown();
-                       blobServer = null;
-
-                       final URL[] urls = new URL[blobKeys.size()];
-
-                       for(int i = 0; i < blobKeys.size(); i++){
-                               urls[i] = blobCache.getURL(blobKeys.get(i));
-                       }
-
-                       // Verify the result
-                       assertEquals(blobKeys.size(), urls.length);
-
-                       for (int i = 0; i < urls.length; ++i) {
-
-                               final URL url = urls[i];
-
-                               assertNotNull(url);
-
-                               try {
-                                       final File cachedFile = new 
File(url.toURI());
-
-                                       assertTrue(cachedFile.exists());
-                                       assertEquals(buf.length, 
cachedFile.length());
-
-                               } catch (URISyntaxException e) {
-                                       fail(e.getMessage());
-                               }
-
-                       }
-
-               } catch (IOException ioe) {
-                       fail(ioe.getMessage());
-               } finally {
-                       if (blobServer != null) {
-                               blobServer.shutdown();
-                       }
-
-                       if(blobCache != null){
-                               blobCache.shutdown();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 1465777..2254d7c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.fail;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,7 +32,6 @@ import java.security.MessageDigest;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.StringUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -43,14 +41,10 @@ import org.junit.Test;
  */
 public class BlobClientTest {
 
-       /**
-        * The buffer size used during the tests in bytes.
-        */
+       /** The buffer size used during the tests in bytes. */
        private static final int TEST_BUFFER_SIZE = 17 * 1000;
 
-       /**
-        * The instance of the BLOB server used during the tests.
-        */
+       /** The instance of the BLOB server used during the tests. */
        private static BlobServer BLOB_SERVER;
 
        /**
@@ -60,10 +54,11 @@ public class BlobClientTest {
        public static void startServer() {
                try {
                        BLOB_SERVER = new BlobServer(new Configuration());
-               } catch (IOException ioe) {
-                       fail(StringUtils.stringifyException(ioe));
                }
-
+               catch (IOException e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
        }
 
        /**
@@ -82,13 +77,10 @@ public class BlobClientTest {
         * @return a test buffer filled with a specific byte pattern
         */
        private static byte[] createTestBuffer() {
-
                final byte[] buf = new byte[TEST_BUFFER_SIZE];
-
                for (int i = 0; i < buf.length; ++i) {
                        buf[i] = (byte) (i % 128);
                }
-
                return buf;
        }
 
@@ -102,7 +94,7 @@ public class BlobClientTest {
         * @throws IOException
         *         thrown if an I/O error occurs while writing to the test file
         */
-       private static BlobKey prepareTestFile(final File file) throws 
IOException {
+       private static BlobKey prepareTestFile(File file) throws IOException {
 
                MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -203,44 +195,44 @@ public class BlobClientTest {
        @Test
        public void testContentAddressableBuffer() {
 
-               final byte[] testBuffer = createTestBuffer();
-               final MessageDigest md = BlobUtils.createMessageDigest();
-               md.update(testBuffer);
-               final BlobKey origKey = new BlobKey(md.digest());
+               BlobClient client = null;
 
                try {
+                       byte[] testBuffer = createTestBuffer();
+                       MessageDigest md = BlobUtils.createMessageDigest();
+                       md.update(testBuffer);
+                       BlobKey origKey = new BlobKey(md.digest());
 
-                       BlobClient client = null;
-                       try {
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
+                       client = new BlobClient(serverAddress);
 
-                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
-                               client = new BlobClient(serverAddress);
+                       // Store the data
+                       BlobKey receivedKey = client.put(testBuffer);
+                       assertEquals(origKey, receivedKey);
 
-                               // Store the data
-                               final BlobKey receivedKey = 
client.put(testBuffer);
-                               assertEquals(origKey, receivedKey);
+                       // Retrieve the data
+                       InputStream is = client.get(receivedKey);
+                       validateGet(is, testBuffer);
 
-                               // Retrieve the data
-                               final InputStream is = client.get(receivedKey);
-                               validateGet(is, testBuffer);
-
-                               // Check reaction to invalid keys
+                       // Check reaction to invalid keys
+                       try {
+                               client.get(new BlobKey());
+                               fail("Expected IOException did not occur");
+                       }
+                       catch (IOException fnfe) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
                                try {
-                                       client.get(new BlobKey());
-                               } catch (FileNotFoundException fnfe) {
-                                       return;
-                               }
-
-                               fail("Expected FileNotFoundException did not 
occur");
-
-                       } finally {
-                               if (client != null) {
                                        client.close();
-                               }
+                               } catch (Throwable t) {}
                        }
-
-               } catch (IOException ioe) {
-                       fail(StringUtils.stringifyException(ioe));
                }
        }
 
@@ -250,42 +242,45 @@ public class BlobClientTest {
        @Test
        public void testContentAddressableStream() {
 
-               try {
+               BlobClient client = null;
+               InputStream is = null;
 
-                       final File testFile = File.createTempFile("testfile", 
".dat");
+               try {
+                       File testFile = File.createTempFile("testfile", ".dat");
                        testFile.deleteOnExit();
-                       final BlobKey origKey = prepareTestFile(testFile);
 
-                       BlobClient client = null;
-                       InputStream is = null;
-                       try {
-
-                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
-                               client = new BlobClient(serverAddress);
+                       BlobKey origKey = prepareTestFile(testFile);
 
-                               // Store the data
-                               is = new FileInputStream(testFile);
-                               final BlobKey receivedKey = client.put(is);
-                               assertEquals(origKey, receivedKey);
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
+                       client = new BlobClient(serverAddress);
 
-                               is.close();
-                               is = null;
+                       // Store the data
+                       is = new FileInputStream(testFile);
+                       BlobKey receivedKey = client.put(is);
+                       assertEquals(origKey, receivedKey);
 
-                               // Retrieve the data
-                               is = client.get(receivedKey);
-                               validateGet(is, testFile);
+                       is.close();
+                       is = null;
 
-                       } finally {
-                               if (is != null) {
+                       // Retrieve the data
+                       is = client.get(receivedKey);
+                       validateGet(is, testFile);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (is != null) {
+                               try {
                                        is.close();
-                               }
-                               if (client != null) {
+                               } catch (Throwable t) {}
+                       }
+                       if (client != null) {
+                               try {
                                        client.close();
-                               }
+                               } catch (Throwable t) {}
                        }
-
-               } catch (IOException ioe) {
-                       fail(StringUtils.stringifyException(ioe));
                }
        }
 
@@ -300,11 +295,9 @@ public class BlobClientTest {
                final String key = "testkey";
 
                try {
-
                        BlobClient client = null;
                        try {
-
-                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
                                client = new BlobClient(serverAddress);
 
                                // Store the data
@@ -320,20 +313,21 @@ public class BlobClientTest {
                                // Check if the BLOB is still available
                                try {
                                        client.get(jobID, key);
-                               } catch (FileNotFoundException fnfe) {
-                                       return;
+                                       fail("Expected IOException did not 
occur");
                                }
-
-                               fail("Expected FileNotFoundException did not 
occur");
-
-                       } finally {
+                               catch (IOException e) {
+                                       // expected
+                               }
+                       }
+                       finally {
                                if (client != null) {
                                        client.close();
                                }
                        }
-
-               } catch (IOException ioe) {
-                       fail(StringUtils.stringifyException(ioe));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
                }
        }
 
@@ -355,7 +349,7 @@ public class BlobClientTest {
                        InputStream is = null;
                        try {
 
-                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getServerPort());
+                               final InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", BLOB_SERVER.getPort());
                                client = new BlobClient(serverAddress);
 
                                // Store the data
@@ -369,7 +363,8 @@ public class BlobClientTest {
                                is = client.get(jobID, key);
                                validateGet(is, testFile);
 
-                       } finally {
+                       }
+                       finally {
                                if (is != null) {
                                        is.close();
                                }
@@ -378,8 +373,10 @@ public class BlobClientTest {
                                }
                        }
 
-               } catch (IOException ioe) {
-                       fail(StringUtils.stringifyException(ioe));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
new file mode 100644
index 0000000..adb3bfc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests how DELETE requests behave.
+ */
+public class BlobServerDeleteTest {
+
+       private final Random rnd = new Random();
+
+       @Test
+       public void testDeleteSingle() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       BlobKey key = client.put(data);
+                       assertNotNull(key);
+
+                       // issue a DELETE request
+                       client.delete(key);
+                       client.close();
+
+                       client = new BlobClient(serverAddress);
+                       try {
+                               client.get(key);
+                               fail("BLOB should have been deleted");
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+
+                       try {
+                               client.put(new byte[1]);
+                               fail("client should be closed after erroneous 
operation");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testDeleteAll() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       JobID jobID = new JobID();
+                       String name1 = "random name";
+                       String name2 = "any nyme";
+
+                       // put content addressable (like libraries)
+                       client.put(jobID, name1, data);
+                       client.put(jobID, name2, new byte[712]);
+
+
+                       // issue a DELETE ALL request
+                       client.deleteAll(jobID);
+                       client.close();
+
+                       client = new BlobClient(serverAddress);
+                       try {
+                               client.get(jobID, name1);
+                               fail("BLOB should have been deleted");
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+
+                       try {
+                               client.put(new byte[1]);
+                               fail("client should be closed after erroneous 
operation");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+
+                       client = new BlobClient(serverAddress);
+                       try {
+                               client.get(jobID, name2);
+                               fail("BLOB should have been deleted");
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testDeleteAlreadyDeletedByBlobKey() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       BlobKey key = client.put(data);
+                       assertNotNull(key);
+
+                       File blobFile = server.getStorageLocation(key);
+                       assertTrue(blobFile.delete());
+
+                       // issue a DELETE request
+                       try {
+                               client.delete(key);
+                       }
+                       catch (IOException e) {
+                               fail("DELETE operation should not fail if file 
is already deleted");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testDeleteAlreadyDeletedByName() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       JobID jid = new JobID();
+                       String name = "------------fdghljEgRJHF+##4U789Q345";
+
+                       client.put(jid, name, data);
+
+                       File blobFile = server.getStorageLocation(jid, name);
+                       assertTrue(blobFile.delete());
+
+                       // issue a DELETE request
+                       try {
+                               client.delete(jid, name);
+                       }
+                       catch (IOException e) {
+                               fail("DELETE operation should not fail if file 
is already deleted");
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testDeleteFails() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       BlobKey key = client.put(data);
+                       assertNotNull(key);
+
+                       File blobFile = server.getStorageLocation(key);
+                       File directory = blobFile.getParentFile();
+
+                       assertTrue(blobFile.setWritable(false, false));
+                       assertTrue(directory.setWritable(false, false));
+
+                       // issue a DELETE request
+                       try {
+                               client.delete(key);
+                               fail("DELETE operation should fail if file 
cannot be deleted");
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+                       finally {
+                               blobFile.setWritable(true, false);
+                               directory.setWritable(true, false);
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
new file mode 100644
index 0000000..c564670
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests how failing GET requests behave in the presence of failures.
+ * Successful GET requests are tested in conjunction wit the PUT
+ * requests.
+ */
+public class BlobServerGetTest {
+
+       private final Random rnd = new Random();
+
+       @Test
+       public void testGetFailsDuringLookup() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       BlobKey key = client.put(data);
+                       assertNotNull(key);
+
+                       // delete all files to make sure that GET requests fail
+                       File blobFile = server.getStorageLocation(key);
+                       assertTrue(blobFile.delete());
+
+                       // issue a GET request that fails
+                       try {
+                               client.get(key);
+                               fail("This should not succeed.");
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testGetFailsDuringStreaming() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[5000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       BlobKey key = client.put(data);
+                       assertNotNull(key);
+
+                       // issue a GET request that succeeds
+                       InputStream is = client.get(key);
+
+                       byte[] receiveBuffer = new byte[50000];
+                       BlobUtils.readFully(is, receiveBuffer, 0, 
receiveBuffer.length, null);
+                       BlobUtils.readFully(is, receiveBuffer, 0, 
receiveBuffer.length, null);
+
+                       // shut down the server
+                       for (BlobServerConnection conn : 
server.getCurrentyActiveConnections()) {
+                               conn.close();
+                       }
+
+                       try {
+                               byte[] remainder = new byte[data.length - 
2*receiveBuffer.length];
+                               BlobUtils.readFully(is, remainder, 0, 
remainder.length, null);
+                               fail();
+                       }
+                       catch (IOException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
new file mode 100644
index 0000000..1f8d29b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for successful and failing PUT operations against the BLOB server,
+ * and successful GET operations.
+ */
+public class BlobServerPutTest {
+
+       private final Random rnd = new Random();
+
+       @Test
+       public void testPutBufferSuccessful() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       BlobKey key1 = client.put(data);
+                       assertNotNull(key1);
+
+                       BlobKey key2 = client.put(data, 10, 44);
+                       assertNotNull(key2);
+
+                       // put under job and name scope
+                       JobID jid = new JobID();
+                       String stringKey = "my test key";
+                       client.put(jid, stringKey, data);
+
+                       // --- GET the data and check that it is equal ---
+
+                       // one get request on the same client
+                       InputStream is1 = client.get(key2);
+                       byte[] result1 = new byte[44];
+                       BlobUtils.readFully(is1, result1, 0, result1.length, 
null);
+                       is1.close();
+
+                       for (int i = 0, j = 10; i < result1.length; i++, j++) {
+                               assertEquals(data[j], result1[i]);
+                       }
+
+                       // close the client and create a new one for the 
remaining requests
+                       client.close();
+                       client = new BlobClient(serverAddress);
+
+                       InputStream is2 = client.get(key1);
+                       byte[] result2 = new byte[data.length];
+                       BlobUtils.readFully(is2, result2, 0, result2.length, 
null);
+                       is2.close();
+                       assertArrayEquals(data, result2);
+
+                       InputStream is3 = client.get(jid, stringKey);
+                       byte[] result3 = new byte[data.length];
+                       BlobUtils.readFully(is3, result3, 0, result3.length, 
null);
+                       is3.close();
+                       assertArrayEquals(data, result3);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+
+       @Test
+       public void testPutStreamSuccessful() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       {
+                               BlobKey key1 = client.put(new 
ByteArrayInputStream(data));
+                               assertNotNull(key1);
+
+                       }
+
+                       // put under job and name scope
+                       {
+                               JobID jid = new JobID();
+                               String stringKey = "my test key";
+                               client.put(jid, stringKey, new 
ByteArrayInputStream(data));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testPutChunkedStreamSuccessful() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       {
+                               BlobKey key1 = client.put(new 
ChunkedInputStream(data, 19));
+                               assertNotNull(key1);
+
+                       }
+
+                       // put under job and name scope
+                       {
+                               JobID jid = new JobID();
+                               String stringKey = "my test key";
+                               client.put(jid, stringKey, new 
ChunkedInputStream(data, 17));
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testPutBufferFails() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               File tempFileDir = null;
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       // make sure the blob server cannot create any files in 
its storage dir
+                       tempFileDir = 
server.createTemporaryFilename().getParentFile().getParentFile();
+                       assertTrue(tempFileDir.setExecutable(true, false));
+                       assertTrue(tempFileDir.setReadable(true, false));
+                       assertTrue(tempFileDir.setWritable(false, false));
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put content addressable (like libraries)
+                       try {
+                               client.put(data);
+                               fail("This should fail.");
+                       }
+                       catch (IOException e) {
+                               assertTrue(e.getMessage(), 
e.getMessage().contains("Server side error"));
+                       }
+
+                       try {
+                               client.put(data);
+                               fail("Client should be closed");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       // set writable again to make sure we can remove the 
directory
+                       if (tempFileDir != null) {
+                               tempFileDir.setWritable(true, false);
+                       }
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       @Test
+       public void testPutNamedBufferFails() {
+               BlobServer server = null;
+               BlobClient client = null;
+
+               File tempFileDir = null;
+               try {
+                       Configuration config = new Configuration();
+                       server = new BlobServer(config);
+
+                       // make sure the blob server cannot create any files in 
its storage dir
+                       tempFileDir = 
server.createTemporaryFilename().getParentFile().getParentFile();
+                       assertTrue(tempFileDir.setExecutable(true, false));
+                       assertTrue(tempFileDir.setReadable(true, false));
+                       assertTrue(tempFileDir.setWritable(false, false));
+
+                       InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", server.getPort());
+                       client = new BlobClient(serverAddress);
+
+                       byte[] data = new byte[2000000];
+                       rnd.nextBytes(data);
+
+                       // put under job and name scope
+                       try {
+                               JobID jid = new JobID();
+                               String stringKey = "my test key";
+                               client.put(jid, stringKey, data);
+                               fail("This should fail.");
+                       }
+                       catch (IOException e) {
+                               assertTrue(e.getMessage(), 
e.getMessage().contains("Server side error"));
+                       }
+
+                       try {
+                               JobID jid = new JobID();
+                               String stringKey = "another key";
+                               client.put(jid, stringKey, data);
+                               fail("Client should be closed");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       // set writable again to make sure we can remove the 
directory
+                       if (tempFileDir != null) {
+                               tempFileDir.setWritable(true, false);
+                       }
+                       if (client != null) {
+                               try {
+                                       client.close();
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                               }
+                       }
+                       if (server != null) {
+                               server.shutdown();
+                       }
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static final class ChunkedInputStream extends InputStream {
+
+               private final byte[][] data;
+
+               private int x = 0, y = 0;
+
+
+               private ChunkedInputStream(byte[] data, int numChunks) {
+                       this.data = new byte[numChunks][];
+
+                       int bytesPerChunk = data.length / numChunks;
+                       int bytesTaken = 0;
+                       for (int i = 0; i < numChunks - 1; i++, bytesTaken += 
bytesPerChunk) {
+                               this.data[i] = new byte[bytesPerChunk];
+                               System.arraycopy(data, bytesTaken, 
this.data[i], 0, bytesPerChunk);
+                       }
+
+                       this.data[numChunks -  1] = new byte[data.length - 
bytesTaken];
+                       System.arraycopy(data, bytesTaken, this.data[numChunks 
-  1], 0, this.data[numChunks -  1].length);
+               }
+
+               @Override
+               public int read() {
+                       if (x < data.length) {
+                               byte[] curr = data[x];
+                               if (y < curr.length) {
+                                       byte next = curr[y];
+                                       y++;
+                                       return next;
+                               }
+                               else {
+                                       y = 0;
+                                       x++;
+                                       return read();
+                               }
+                       } else {
+                               return -1;
+                       }
+               }
+
+               @Override
+               public int read(byte[] b, int off, int len) throws IOException {
+                       if (len == 0) {
+                               return 0;
+                       }
+                       if (x < data.length) {
+                               byte[] curr = data[x];
+                               if (y < curr.length) {
+                                       int toCopy = Math.min(len, curr.length 
- y);
+                                       System.arraycopy(curr, y, b, off, 
toCopy);
+                                       y += toCopy;
+                                       return toCopy;
+                               } else {
+                                       y = 0;
+                                       x++;
+                                       return read(b, off, len);
+                               }
+                       }
+                       else {
+                               return -1;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
new file mode 100644
index 0000000..93f9b73
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/TestingFailingBlobServer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+public class TestingFailingBlobServer extends BlobServer {
+
+       private int numFailures;
+
+       public TestingFailingBlobServer(Configuration config, int numFailures) 
throws IOException {
+               super(config);
+               this.numFailures = numFailures;
+       }
+
+       @Override
+       public void run() {
+
+               // we do properly the first operation (PUT)
+               try {
+                       new BlobServerConnection(getServerSocket().accept(), 
this).start();
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+               }
+
+               // do some failing operations
+               for (int num = 0; num < numFailures && !isShutdown(); num++) {
+                       Socket socket = null;
+                       try {
+                               socket = getServerSocket().accept();
+                               InputStream is = socket.getInputStream();
+                               OutputStream os = socket.getOutputStream();
+
+                               // just abort everything
+                               is.close();
+                               os.close();
+                               socket.close();
+                       }
+                       catch (IOException e) {
+                       }
+                       finally {
+                               if (socket != null) {
+                                       try {
+                                               socket.close();
+                                       } catch(Throwable t) {}
+                               }
+                       }
+               }
+
+               // regular runs
+               super.run();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b5e1c5a1/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 5029b15..1d2b6ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -52,7 +52,7 @@ public class BlobLibraryCacheManagerTest {
 
                try {
                        server = new BlobServer(new Configuration());
-                       InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getServerPort());
+                       InetSocketAddress blobSocketAddress = new 
InetSocketAddress(server.getPort());
                        BlobClient bc = new BlobClient(blobSocketAddress);
 
                        keys.add(bc.put(buf));

Reply via email to