Repository: flink
Updated Branches:
  refs/heads/master b2b94632d -> f853f3359


[FLINK-7140][blob] add an additional random component into the BlobKey

This should guard us from uploading (and deleting) the same file more than
once and also from hash collisions.

This closes #4359.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f853f335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f853f335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f853f335

Branch: refs/heads/master
Commit: f853f33593373f75a72351f5256564533bc063f9
Parents: b2b9463
Author: Nico Kruber <[email protected]>
Authored: Thu Oct 5 09:33:27 2017 +0200
Committer: Till <[email protected]>
Committed: Mon Oct 16 09:23:37 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/blob/AbstractBlobCache.java   |  18 +++-
 .../apache/flink/runtime/blob/BlobClient.java   |  12 ++-
 .../org/apache/flink/runtime/blob/BlobKey.java  | 100 +++++++++++++++++--
 .../apache/flink/runtime/blob/BlobServer.java   |  66 +++++++++---
 .../runtime/blob/BlobServerConnection.java      |  18 ++--
 .../apache/flink/runtime/blob/BlobUtils.java    |  12 +--
 .../flink/runtime/blob/PermanentBlobKey.java    |  12 +++
 .../flink/runtime/blob/TransientBlobKey.java    |  12 +++
 .../handler/legacy/TaskManagerLogHandler.java   |   3 +
 .../flink/runtime/blob/BlobCacheDeleteTest.java |  24 +++--
 .../flink/runtime/blob/BlobCacheGetTest.java    |  33 +++---
 .../flink/runtime/blob/BlobCachePutTest.java    |  37 +++++--
 .../runtime/blob/BlobCacheRecoveryTest.java     |  12 +--
 .../flink/runtime/blob/BlobClientTest.java      |  53 +++++-----
 .../apache/flink/runtime/blob/BlobKeyTest.java  |  84 +++++++++++++---
 .../runtime/blob/BlobServerDeleteTest.java      |  21 ++--
 .../flink/runtime/blob/BlobServerGetTest.java   |  19 ++--
 .../flink/runtime/blob/BlobServerPutTest.java   |  33 ++++--
 .../runtime/blob/BlobServerRecoveryTest.java    |   7 +-
 19 files changed, 405 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index dc031e0..729ac9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -159,8 +159,13 @@ public abstract class AbstractBlobCache implements 
Closeable {
                        try {
                                if (blobView.get(jobId, blobKey, incomingFile)) 
{
                                        // now move the temp file to our local 
cache atomically
-                                       BlobUtils.moveTempFileToStore(
-                                               incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), log, null);
+                                       readWriteLock.writeLock().lock();
+                                       try {
+                                               BlobUtils.moveTempFileToStore(
+                                                       incomingFile, jobId, 
blobKey, localFile, log, null);
+                                       } finally {
+                                               
readWriteLock.writeLock().unlock();
+                                       }
 
                                        return localFile;
                                }
@@ -172,8 +177,13 @@ public abstract class AbstractBlobCache implements 
Closeable {
                        BlobClient.downloadFromBlobServer(
                                jobId, blobKey, incomingFile, serverAddress, 
blobClientConfig, numFetchRetries);
 
-                       BlobUtils.moveTempFileToStore(
-                               incomingFile, jobId, blobKey, localFile, 
readWriteLock.writeLock(), log, null);
+                       readWriteLock.writeLock().lock();
+                       try {
+                               BlobUtils.moveTempFileToStore(
+                                       incomingFile, jobId, blobKey, 
localFile, log, null);
+                       } finally {
+                               readWriteLock.writeLock().unlock();
+                       }
 
                        return localFile;
                } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 3154f69..fbcce58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -47,9 +47,11 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.security.MessageDigest;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static 
org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
@@ -57,7 +59,6 @@ import static 
org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CON
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
 import static org.apache.flink.runtime.blob.BlobUtils.writeLength;
@@ -496,13 +497,16 @@ public final class BlobClient implements Closeable {
                else if (response == RETURN_OKAY) {
 
                        BlobKey remoteKey = BlobKey.readFromInputStream(is);
-                       BlobKey localKey = BlobKey.createKey(blobType, 
md.digest());
+                       byte[] localHash = md.digest();
 
-                       if (!localKey.equals(remoteKey)) {
+                       if (blobType != remoteKey.getType()) {
+                               throw new IOException("Detected data corruption 
during transfer");
+                       }
+                       if (!Arrays.equals(localHash, remoteKey.getHash())) {
                                throw new IOException("Detected data corruption 
during transfer");
                        }
 
-                       return localKey;
+                       return remoteKey;
                }
                else if (response == RETURN_ERROR) {
                        Throwable cause = readExceptionFromStream(is);

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
index 0aa45e1..ef2d64d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.StringUtils;
 
 import java.io.EOFException;
@@ -41,7 +42,7 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        private static final long serialVersionUID = 3847117712521785209L;
 
        /** Size of the internal BLOB key in bytes. */
-       private static final int SIZE = 20;
+       public static final int SIZE = 20;
 
        /** The byte buffer storing the actual key data. */
        private final byte[] key;
@@ -68,6 +69,11 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        }
 
        /**
+        * Random component of the key.
+        */
+       private final AbstractID random;
+
+       /**
         * Constructs a new BLOB key.
         *
         * @param type
@@ -76,6 +82,7 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        protected BlobKey(BlobType type) {
                this.type = checkNotNull(type);
                this.key = new byte[SIZE];
+               this.random = new AbstractID();
        }
 
        /**
@@ -87,13 +94,33 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
         *        the actual key data
         */
        protected BlobKey(BlobType type, byte[] key) {
+               if (key == null || key.length != SIZE) {
+                       throw new IllegalArgumentException("BLOB key must have 
a size of " + SIZE + " bytes");
+               }
+
                this.type = checkNotNull(type);
+               this.key = key;
+               this.random = new AbstractID();
+       }
 
+       /**
+        * Constructs a new BLOB key from the given byte array.
+        *
+        * @param type
+        *              whether the referenced BLOB is permanent or transient
+        * @param key
+        *        the actual key data
+        * @param random
+        *        the random component of the key
+        */
+       protected BlobKey(BlobType type, byte[] key, byte[] random) {
                if (key == null || key.length != SIZE) {
                        throw new IllegalArgumentException("BLOB key must have 
a size of " + SIZE + " bytes");
                }
 
+               this.type = checkNotNull(type);
                this.key = key;
+               this.random = new AbstractID(random);
        }
 
        /**
@@ -107,10 +134,10 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        @VisibleForTesting
        static BlobKey createKey(BlobType type) {
                if (type == PERMANENT_BLOB) {
-            return new PermanentBlobKey();
-        } else {
+                       return new PermanentBlobKey();
+               } else {
                        return new TransientBlobKey();
-        }
+               }
        }
 
        /**
@@ -125,10 +152,30 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
         */
        static BlobKey createKey(BlobType type, byte[] key) {
                if (type == PERMANENT_BLOB) {
-            return new PermanentBlobKey(key);
-        } else {
+                       return new PermanentBlobKey(key);
+               } else {
                        return new TransientBlobKey(key);
-        }
+               }
+       }
+
+       /**
+        * Returns the right {@link BlobKey} subclass for the given parameters.
+        *
+        * @param type
+        *              whether the referenced BLOB is permanent or transient
+        * @param key
+        *        the actual key data
+        * @param random
+        *        the random component of the key
+        *
+        * @return BlobKey subclass
+        */
+       static BlobKey createKey(BlobType type, byte[] key, byte[] random) {
+               if (type == PERMANENT_BLOB) {
+                       return new PermanentBlobKey(key, random);
+               } else {
+                       return new TransientBlobKey(key, random);
+               }
        }
 
        /**
@@ -141,6 +188,15 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        }
 
        /**
+        * Returns the (internal) BLOB type which is reflected by the 
inheriting sub-class.
+        *
+        * @return BLOB type, i.e. permanent or transient
+        */
+       BlobType getType() {
+               return type;
+       }
+
+       /**
         * Adds the BLOB key to the given {@link MessageDigest}.
         * 
         * @param md
@@ -159,13 +215,16 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
 
                final BlobKey bk = (BlobKey) obj;
 
-               return Arrays.equals(this.key, bk.key) && this.type == bk.type;
+               return Arrays.equals(this.key, bk.key) &&
+                       this.type == bk.type &&
+                       this.random.equals(bk.random);
        }
 
        @Override
        public int hashCode() {
                int result = Arrays.hashCode(this.key);
                result = 37 * result + this.type.hashCode();
+               result = 37 * result + this.random.hashCode();
                return result;
        }
 
@@ -183,7 +242,7 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
                                // this actually never happens!
                                throw new IllegalStateException("Invalid BLOB 
type");
                }
-               return typeString + StringUtils.byteToHexString(this.key);
+               return typeString + StringUtils.byteToHexString(this.key) + "-" 
+ random.toString();
        }
 
        @Override
@@ -203,7 +262,13 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
 
                if (aarr.length == barr.length) {
                        // same hash contents - compare the BLOB types
-                       return this.type.compareTo(o.type);
+                       int typeCompare = this.type.compareTo(o.type);
+                       if (typeCompare == 0) {
+                               // same type - compare random components
+                               return this.random.compareTo(o.random);
+                       } else {
+                               return typeCompare;
+                       }
                } else {
                        return aarr.length - barr.length;
                }
@@ -223,6 +288,7 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        static BlobKey readFromInputStream(InputStream inputStream) throws 
IOException {
 
                final byte[] key = new byte[BlobKey.SIZE];
+               final byte[] random = new byte[AbstractID.SIZE];
 
                int bytesRead = 0;
                // read key
@@ -233,6 +299,7 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
                        }
                        bytesRead += read;
                }
+
                // read BLOB type
                final BlobType blobType;
                {
@@ -248,7 +315,17 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
                        }
                }
 
-               return createKey(blobType, key);
+               // read random component
+               bytesRead = 0;
+               while (bytesRead < AbstractID.SIZE) {
+                       final int read = inputStream.read(random, bytesRead, 
AbstractID.SIZE - bytesRead);
+                       if (read < 0) {
+                               throw new EOFException("Read an incomplete BLOB 
key");
+                       }
+                       bytesRead += read;
+               }
+
+               return createKey(blobType, key, random);
        }
 
        /**
@@ -262,5 +339,6 @@ abstract class BlobKey implements Serializable, 
Comparable<BlobKey> {
        void writeToOutputStream(final OutputStream outputStream) throws 
IOException {
                outputStream.write(this.key);
                outputStream.write(this.type.ordinal());
+               outputStream.write(this.random.getBytes());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 7804dfd..bc61ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -474,8 +474,13 @@ public class BlobServer extends Thread implements 
BlobService, PermanentBlobServ
                                incomingFile = createTemporaryFilename();
                                blobStore.get(jobId, blobKey, incomingFile);
 
-                               BlobUtils.moveTempFileToStore(
-                                       incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), LOG, null);
+                               readWriteLock.writeLock().lock();
+                               try {
+                                       BlobUtils.moveTempFileToStore(
+                                               incomingFile, jobId, blobKey, 
localFile, LOG, null);
+                               } finally {
+                                       readWriteLock.writeLock().unlock();
+                               }
 
                                return;
                        } finally {
@@ -586,10 +591,8 @@ public class BlobServer extends Thread implements 
BlobService, PermanentBlobServ
                        md.update(value);
                        fos.write(value);
 
-                       blobKey = BlobKey.createKey(blobType, md.digest());
-
                        // persist file
-                       moveTempFileToStore(incomingFile, jobId, blobKey);
+                       blobKey = moveTempFileToStore(incomingFile, jobId, 
md.digest(), blobType);
 
                        return blobKey;
                } finally {
@@ -642,10 +645,8 @@ public class BlobServer extends Thread implements 
BlobService, PermanentBlobServ
                                md.update(buf, 0, bytesRead);
                        }
 
-                       blobKey = BlobKey.createKey(blobType, md.digest());
-
                        // persist file
-                       moveTempFileToStore(incomingFile, jobId, blobKey);
+                       blobKey = moveTempFileToStore(incomingFile, jobId, 
md.digest(), blobType);
 
                        return blobKey;
                } finally {
@@ -665,20 +666,53 @@ public class BlobServer extends Thread implements 
BlobService, PermanentBlobServ
         *              temporary file created during transfer
         * @param jobId
         *              ID of the job this blob belongs to or <tt>null</tt> if 
job-unrelated
-        * @param blobKey
-        *              BLOB key identifying the file
+        * @param digest
+        *              BLOB content digest, i.e. hash
+        * @param blobType
+        *              whether this file is a permanent or transient BLOB
+        *
+        * @return unique BLOB key that identifies the BLOB on the server
         *
         * @throws IOException
         *              thrown if an I/O error occurs while moving the file or 
uploading it to the HA store
         */
-       void moveTempFileToStore(
-                       File incomingFile, @Nullable JobID jobId, BlobKey 
blobKey) throws IOException {
+       BlobKey moveTempFileToStore(
+                       File incomingFile, @Nullable JobID jobId, byte[] 
digest, BlobKey.BlobType blobType)
+                       throws IOException {
+
+               int retries = 10;
 
-               File storageFile = BlobUtils.getStorageLocation(storageDir, 
jobId, blobKey);
+               int attempt = 0;
+               while (true) {
+                       // add unique component independent of the BLOB content
+                       BlobKey blobKey = BlobKey.createKey(blobType, digest);
+                       File storageFile = 
BlobUtils.getStorageLocation(storageDir, jobId, blobKey);
 
-               BlobUtils.moveTempFileToStore(
-                       incomingFile, jobId, blobKey, storageFile, 
readWriteLock.writeLock(), LOG,
-                       blobKey instanceof PermanentBlobKey ? blobStore : null);
+                       // try again until the key is unique (put the existence 
check into the lock!)
+                       readWriteLock.writeLock().lock();
+                       try {
+                               if (!storageFile.exists()) {
+                                       BlobUtils.moveTempFileToStore(
+                                               incomingFile, jobId, blobKey, 
storageFile, LOG,
+                                               blobKey instanceof 
PermanentBlobKey ? blobStore : null);
+                                       return blobKey;
+                               }
+                       } finally {
+                               readWriteLock.writeLock().unlock();
+                       }
+
+                       ++attempt;
+                       if (attempt >= retries) {
+                               String message = "Failed to find a unique key 
for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + 
".";
+                               LOG.error(message + " No retries left.");
+                               throw new IOException(message);
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Trying to find a unique key 
for BLOB of job {} (retry {}, last tried {})",
+                                               jobId, attempt, 
storageFile.getAbsolutePath());
+                               }
+                       }
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index be62581..fa8427e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -37,6 +37,8 @@ import java.security.MessageDigest;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
+import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION;
 import static 
org.apache.flink.runtime.blob.BlobServerProtocol.JOB_RELATED_CONTENT;
@@ -44,8 +46,6 @@ import static 
org.apache.flink.runtime.blob.BlobServerProtocol.JOB_UNRELATED_CON
 import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR;
 import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
-import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobUtils.closeSilently;
 import static org.apache.flink.runtime.blob.BlobUtils.readFully;
 import static org.apache.flink.runtime.blob.BlobUtils.readLength;
@@ -346,9 +346,9 @@ class BlobServerConnection extends Thread {
                        }
 
                        incomingFile = blobServer.createTemporaryFilename();
-                       BlobKey blobKey = readFileFully(inputStream, 
incomingFile, buf, blobType);
+                       byte[] digest = readFileFully(inputStream, 
incomingFile, buf);
 
-                       blobServer.moveTempFileToStore(incomingFile, jobId, 
blobKey);
+                       BlobKey blobKey = 
blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);
 
                        // Return computed key to client for validation
                        outputStream.write(RETURN_OKAY);
@@ -387,16 +387,14 @@ class BlobServerConnection extends Thread {
         *              file to write to
         * @param buf
         *              An auxiliary buffer for data 
serialization/deserialization
-        * @param blobType
-        *              whether to make the data permanent or transient
         *
-        * @return the received file's content hash as a BLOB key
+        * @return the received file's content hash
         *
         * @throws IOException
         *              thrown if an I/O error occurs while reading/writing 
data from/to the respective streams
         */
-       private static BlobKey readFileFully(
-                       final InputStream inputStream, final File incomingFile, 
final byte[] buf, BlobKey.BlobType blobType)
+       private static byte[] readFileFully(
+                       final InputStream inputStream, final File incomingFile, 
final byte[] buf)
                        throws IOException {
                MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -417,7 +415,7 @@ class BlobServerConnection extends Thread {
 
                                md.update(buf, 0, bytesExpected);
                        }
-                       return BlobKey.createKey(blobType, md.digest());
+                       return md.digest();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index d8223c8..04f2cdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
@@ -42,7 +43,6 @@ import java.nio.file.Files;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.UUID;
-import java.util.concurrent.locks.Lock;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
@@ -422,7 +422,7 @@ public class BlobUtils {
 
        /**
         * Moves the temporary <tt>incomingFile</tt> to its permanent location 
where it is available for
-        * use.
+        * use (not thread-safe!).
         *
         * @param incomingFile
         *              temporary file created during transfer
@@ -432,8 +432,6 @@ public class BlobUtils {
         *              BLOB key identifying the file
         * @param storageFile
         *      (local) file where the blob is/should be stored
-        * @param writeLock
-        *      lock to acquire before doing the move
         * @param log
         *      logger for debug information
         * @param blobStore
@@ -444,9 +442,7 @@ public class BlobUtils {
         */
        static void moveTempFileToStore(
                        File incomingFile, @Nullable JobID jobId, BlobKey 
blobKey, File storageFile,
-                       Lock writeLock, Logger log, @Nullable BlobStore 
blobStore) throws IOException {
-
-               writeLock.lock();
+                       Logger log, @Nullable BlobStore blobStore) throws 
IOException {
 
                try {
                        // first check whether the file already exists
@@ -483,8 +479,6 @@ public class BlobUtils {
                        if (incomingFile != null && !incomingFile.delete() && 
incomingFile.exists()) {
                                log.warn("Could not delete the staging file {} 
for blob key {} and job {}.", incomingFile, blobKey, jobId);
                        }
-
-                       writeLock.unlock();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
index 2ad8f72..40732fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobKey.java
@@ -42,4 +42,16 @@ public final class PermanentBlobKey extends BlobKey {
        PermanentBlobKey(byte[] key) {
                super(BlobType.PERMANENT_BLOB, key);
        }
+
+       /**
+        * Constructs a new BLOB key from the given byte array.
+        *
+        * @param key
+        *        the actual key data
+        * @param random
+        *        the random component of the key
+        */
+       PermanentBlobKey(byte[] key, byte[] random) {
+               super(BlobType.PERMANENT_BLOB, key, random);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
index 43e0f5f..15a9637 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobKey.java
@@ -42,4 +42,16 @@ public final class TransientBlobKey extends BlobKey {
        TransientBlobKey(byte[] key) {
                super(BlobType.TRANSIENT_BLOB, key);
        }
+
+       /**
+        * Constructs a new BLOB key from the given byte array.
+        *
+        * @param key
+        *        the actual key data
+        * @param random
+        *        the random component of the key
+        */
+       TransientBlobKey(byte[] key, byte[] random) {
+               super(BlobType.TRANSIENT_BLOB, key, random);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 54725e1..cf5bfcb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -190,6 +190,9 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                                                        //delete previous log 
file, if it is different than the current one
                                                        HashMap<String, 
TransientBlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? 
lastSubmittedLog : lastSubmittedStdout;
                                                        if 
(lastSubmittedFile.containsKey(taskManagerID)) {
+                                                               // the BlobKey 
will almost certainly be different but the old file
+                                                               // may not 
exist anymore so we cannot rely on it and need to
+                                                               // download the 
new file anyway, even if the hashes match
                                                                if 
(!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
                                                                        if 
(!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerID))) {
                                                                                
throw new CompletionException(new FlinkException("Could not delete file for " + 
taskManagerID + '.'));

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
index a83d100..16ba020 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheDeleteTest.java
@@ -51,7 +51,6 @@ import static 
org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -87,6 +86,12 @@ public class BlobCacheDeleteTest extends TestLogger {
                testDelete(new JobID(), new JobID());
        }
 
+       @Test
+       public void testDeleteTransient5() throws IOException {
+               JobID jobId = new JobID();
+               testDelete(jobId, jobId);
+       }
+
        /**
         * Uploads a (different) byte array for each of the given jobs and 
verifies that deleting one of
         * them (via the {@link BlobCacheService}) does not influence the other.
@@ -98,7 +103,6 @@ public class BlobCacheDeleteTest extends TestLogger {
         */
        private void testDelete(@Nullable JobID jobId1, @Nullable JobID jobId2)
                        throws IOException {
-               final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null 
&& jobId1.equals(jobId2));
 
                final Configuration config = new Configuration();
                config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
@@ -122,9 +126,10 @@ public class BlobCacheDeleteTest extends TestLogger {
                        // put two more BLOBs (same key, other key) for another 
job ID
                        TransientBlobKey key2a = (TransientBlobKey) put(server, 
jobId2, data, TRANSIENT_BLOB);
                        assertNotNull(key2a);
-                       assertEquals(key1, key2a);
+                       BlobKeyTest.verifyKeyDifferentHashEquals(key1, key2a);
                        TransientBlobKey key2b = (TransientBlobKey) put(server, 
jobId2, data2, TRANSIENT_BLOB);
                        assertNotNull(key2b);
+                       BlobKeyTest.verifyKeyDifferentHashDifferent(key1, 
key2b);
 
                        // issue a DELETE request
                        assertTrue(delete(cache, jobId1, key1));
@@ -134,16 +139,15 @@ public class BlobCacheDeleteTest extends TestLogger {
                        // delete on server so that the cache cannot re-download
                        assertTrue(server.deleteInternal(jobId1, key1));
                        verifyDeleted(cache, jobId1, key1);
-                       // deleting a one BLOB should not affect another BLOB, 
even with the same key if job IDs are different
-                       if (!sameJobId) {
-                               verifyContents(server, jobId2, key2a, data);
-                       }
+                       // deleting one BLOB should not affect another BLOB 
with a different key
+                       // (and keys are always different now)
+                       verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
 
                        // delete first file of second job
                        assertTrue(delete(cache, jobId2, key2a));
-                       // delete only works on local cache (unless already 
deleted - key1 == key2a)!
-                       assertTrue(sameJobId || 
server.getStorageLocation(jobId2, key2a).exists());
+                       // delete only works on local cache
+                       assertTrue(server.getStorageLocation(jobId2, 
key2a).exists());
                        // delete on server so that the cache cannot re-download
                        assertTrue(server.deleteInternal(jobId2, key2a));
                        verifyDeleted(cache, jobId2, key2a);
@@ -151,7 +155,7 @@ public class BlobCacheDeleteTest extends TestLogger {
 
                        // delete second file of second job
                        assertTrue(delete(cache, jobId2, key2b));
-                       // delete only works on local cache (unless already 
deleted - key1 == key2a)!
+                       // delete only works on local cache
                        assertTrue(server.getStorageLocation(jobId2, 
key2b).exists());
                        // delete on server so that the cache cannot re-download
                        assertTrue(server.deleteInternal(jobId2, key2b));

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
index bed27d8..c760d04 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheGetTest.java
@@ -40,7 +40,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.AccessDeniedException;
-import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -57,6 +56,7 @@ import static 
org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventu
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
 import static org.apache.flink.runtime.blob.BlobServerDeleteTest.delete;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
@@ -152,37 +152,37 @@ public class BlobCacheGetTest extends TestLogger {
 
                        // add the same data under a second jobId
                        BlobKey key2 = put(server, jobId2, data, blobType);
-                       assertNotNull(key);
-                       assertEquals(key, key2);
+                       assertNotNull(key2);
+                       verifyKeyDifferentHashEquals(key, key2);
 
                        // request for jobId2 should succeed
-                       get(cache, jobId2, key);
+                       get(cache, jobId2, key2);
                        // request for jobId1 should still fail
                        verifyDeleted(cache, jobId1, key);
 
                        if (blobType == PERMANENT_BLOB) {
                                // still existing on server
-                               assertTrue(server.getStorageLocation(jobId2, 
key).exists());
+                               assertTrue(server.getStorageLocation(jobId2, 
key2).exists());
                                // delete jobId2 on cache
-                               blobFile = 
cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+                               blobFile = 
cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
                                assertTrue(blobFile.delete());
                                // try to retrieve again
-                               get(cache, jobId2, key);
+                               get(cache, jobId2, key2);
 
                                // delete on cache and server, verify that it 
is not accessible anymore
-                               blobFile = 
cache.getPermanentBlobService().getStorageLocation(jobId2, key);
+                               blobFile = 
cache.getPermanentBlobService().getStorageLocation(jobId2, key2);
                                assertTrue(blobFile.delete());
-                               blobFile = server.getStorageLocation(jobId2, 
key);
+                               blobFile = server.getStorageLocation(jobId2, 
key2);
                                assertTrue(blobFile.delete());
-                               verifyDeleted(cache, jobId2, key);
+                               verifyDeleted(cache, jobId2, key2);
                        } else {
                                // deleted eventually on the server by the GET 
request above
-                               verifyDeletedEventually(server, jobId2, key);
+                               verifyDeletedEventually(server, jobId2, key2);
                                // delete jobId2 on cache
-                               blobFile = 
cache.getTransientBlobService().getStorageLocation(jobId2, key);
+                               blobFile = 
cache.getTransientBlobService().getStorageLocation(jobId2, key2);
                                assertTrue(blobFile.delete());
                                // verify that it is not accessible anymore
-                               verifyDeleted(cache, jobId2, key);
+                               verifyDeleted(cache, jobId2, key2);
                        }
                }
        }
@@ -548,11 +548,6 @@ public class BlobCacheGetTest extends TestLogger {
 
                final byte[] data = {1, 2, 3, 4, 99, 42};
 
-               MessageDigest md = BlobUtils.createMessageDigest();
-
-               // create the correct blob key by hashing our input data
-               final BlobKey blobKey = BlobKey.createKey(blobType, 
md.digest(data));
-
                final ExecutorService executor = 
Executors.newFixedThreadPool(numberConcurrentGetOperations);
 
                try (
@@ -563,7 +558,7 @@ public class BlobCacheGetTest extends TestLogger {
                        server.start();
 
                        // upload data first
-                       assertEquals(blobKey, put(server, jobId, data, 
blobType));
+                       final BlobKey blobKey = put(server, jobId, data, 
blobType);
 
                        // now try accessing it concurrently (only HA mode will 
be able to retrieve it from HA store!)
                        for (int i = 0; i < numberConcurrentGetOperations; i++) 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
index aa23c80..56258c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCachePutTest.java
@@ -58,6 +58,7 @@ import java.util.function.Supplier;
 import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFilesExist;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobKeyTest.verifyType;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static 
org.apache.flink.runtime.blob.BlobServerPutTest.BlockingInputStream;
@@ -275,6 +276,11 @@ public class BlobCachePutTest extends TestLogger {
                        BlobKey key1a = put(cache, jobId1, data, blobType);
                        assertNotNull(key1a);
                        verifyType(blobType, key1a);
+                       // second upload of same data should yield a different 
BlobKey
+                       BlobKey key1a2 = put(cache, jobId1, data, blobType);
+                       assertNotNull(key1a2);
+                       verifyType(blobType, key1a2);
+                       verifyKeyDifferentHashEquals(key1a, key1a2);
 
                        BlobKey key1b = put(cache, jobId1, data2, blobType);
                        assertNotNull(key1b);
@@ -282,19 +288,23 @@ public class BlobCachePutTest extends TestLogger {
 
                        // files should be available on the server
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
 
                        // now put data for jobId2 and verify that both are ok
                        BlobKey key2a = put(cache, jobId2, data, blobType);
                        assertNotNull(key2a);
-                       assertEquals(key1a, key2a);
+                       verifyType(blobType, key2a);
+                       verifyKeyDifferentHashEquals(key1a, key2a);
 
                        BlobKey key2b = put(cache, jobId2, data2, blobType);
                        assertNotNull(key2b);
-                       assertEquals(key1b, key2b);
+                       verifyType(blobType, key2b);
+                       verifyKeyDifferentHashEquals(key1b, key2b);
 
                        // verify the accessibility and the BLOB contents
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
                        verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
@@ -382,26 +392,32 @@ public class BlobCachePutTest extends TestLogger {
                        TransientBlobKey key1a =
                                (TransientBlobKey) put(cache, jobId1, new 
ByteArrayInputStream(data), TRANSIENT_BLOB);
                        assertNotNull(key1a);
+                       // second upload of same data should yield a different 
BlobKey
+                       BlobKey key1a2 = put(cache, jobId1, new 
ByteArrayInputStream(data), TRANSIENT_BLOB);
+                       assertNotNull(key1a2);
+                       verifyKeyDifferentHashEquals(key1a, key1a2);
 
                        TransientBlobKey key1b = (TransientBlobKey) put(cache, 
jobId1, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
                        assertNotNull(key1b);
 
                        // files should be available on the server
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
 
                        // now put data for jobId2 and verify that both are ok
                        TransientBlobKey key2a =
                                (TransientBlobKey) put(cache, jobId2, new 
ByteArrayInputStream(data), TRANSIENT_BLOB);
                        assertNotNull(key2a);
-                       assertEquals(key1a, key2a);
+                       verifyKeyDifferentHashEquals(key1a, key2a);
 
                        TransientBlobKey key2b = (TransientBlobKey) put(cache, 
jobId2, new ByteArrayInputStream(data2), TRANSIENT_BLOB);
                        assertNotNull(key2b);
-                       assertEquals(key1b, key2b);
+                       verifyKeyDifferentHashEquals(key1b, key2b);
 
                        // verify the accessibility and the BLOB contents
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
                        verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
@@ -486,6 +502,10 @@ public class BlobCachePutTest extends TestLogger {
                        TransientBlobKey key1a =
                                (TransientBlobKey) put(cache, jobId1, new 
ChunkedInputStream(data, 19), TRANSIENT_BLOB);
                        assertNotNull(key1a);
+                       // second upload of same data should yield a different 
BlobKey
+                       BlobKey key1a2 = put(cache, jobId1, new 
ChunkedInputStream(data, 19), TRANSIENT_BLOB);
+                       assertNotNull(key1a2);
+                       verifyKeyDifferentHashEquals(key1a, key1a2);
 
                        TransientBlobKey key1b =
                                (TransientBlobKey) put(cache, jobId1, new 
ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
@@ -493,21 +513,23 @@ public class BlobCachePutTest extends TestLogger {
 
                        // files should be available on the server
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
 
                        // now put data for jobId2 and verify that both are ok
                        TransientBlobKey key2a =
                                (TransientBlobKey) put(cache, jobId2, new 
ChunkedInputStream(data, 19), TRANSIENT_BLOB);
                        assertNotNull(key2a);
-                       assertEquals(key1a, key2a);
+                       verifyKeyDifferentHashEquals(key1a, key2a);
 
                        TransientBlobKey key2b =
                                (TransientBlobKey) put(cache, jobId2, new 
ChunkedInputStream(data2, 19), TRANSIENT_BLOB);
                        assertNotNull(key2b);
-                       assertEquals(key1b, key2b);
+                       verifyKeyDifferentHashEquals(key1b, key2b);
 
                        // verify the accessibility and the BLOB contents
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
                        verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
@@ -858,7 +880,8 @@ public class BlobCachePutTest extends TestLogger {
 
                        // make sure that all blob keys are the same
                        while (blobKeyIterator.hasNext()) {
-                               assertEquals(blobKey, blobKeyIterator.next());
+                               // check for unique BlobKey, but should have 
same hash
+                               verifyKeyDifferentHashEquals(blobKey, 
blobKeyIterator.next());
                        }
 
                        // check the uploaded file's contents

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
index 1a3f161..e275949 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheRecoveryTest.java
@@ -38,13 +38,11 @@ import java.util.Random;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashDifferent;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -131,10 +129,8 @@ public class BlobCacheRecoveryTest extends TestLogger {
 
                        // put non-HA data
                        nonHAKey = put(cache0, jobId[0], expected2, 
TRANSIENT_BLOB);
-                       assertNotEquals(keys[0], nonHAKey);
-                       assertThat(keys[0].getHash(), 
not(equalTo(nonHAKey.getHash())));
-                       assertNotEquals(keys[1], nonHAKey);
-                       assertThat(keys[1].getHash(), 
equalTo(nonHAKey.getHash()));
+                       verifyKeyDifferentHashDifferent(keys[0], nonHAKey);
+                       verifyKeyDifferentHashEquals(keys[1], nonHAKey);
 
                        // check that the storage directory exists
                        final Path blobServerPath = new Path(storagePath, 
"blob");

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 0e97604..9e4f4b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -47,6 +47,7 @@ import java.util.Random;
 import static 
org.apache.flink.runtime.blob.BlobCachePutTest.verifyDeletedEventually;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -113,15 +114,13 @@ public class BlobClientTest extends TestLogger {
         *
         * @param file
         *              the file to prepare for the unit tests
-        * @param blobType
-        *              whether the BLOB should become permanent or transient
         *
         * @return the BLOB key of the prepared file
         *
         * @throws IOException
         *              thrown if an I/O error occurs while writing to the test 
file
         */
-       private static BlobKey prepareTestFile(File file, BlobKey.BlobType 
blobType) throws IOException {
+       private static byte[] prepareTestFile(File file) throws IOException {
 
                MessageDigest md = BlobUtils.createMessageDigest();
 
@@ -145,7 +144,7 @@ public class BlobClientTest extends TestLogger {
                        }
                }
 
-               return BlobKey.createKey(blobType, md.digest());
+               return md.digest();
        }
 
        /**
@@ -254,35 +253,38 @@ public class BlobClientTest extends TestLogger {
                        byte[] testBuffer = createTestBuffer();
                        MessageDigest md = BlobUtils.createMessageDigest();
                        md.update(testBuffer);
-                       BlobKey origKey = BlobKey.createKey(blobType, 
md.digest());
+                       byte[] digest = md.digest();
 
                        InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
                        client = new BlobClient(serverAddress, 
getBlobClientConfig());
 
                        JobID jobId = new JobID();
-                       BlobKey receivedKey;
 
                        // Store the data (job-unrelated)
+                       BlobKey receivedKey1 = null;
                        if (blobType == TRANSIENT_BLOB) {
-                               receivedKey = client.putBuffer(null, 
testBuffer, 0, testBuffer.length, blobType);
-                               assertEquals(origKey, receivedKey);
+                               receivedKey1 = client.putBuffer(null, 
testBuffer, 0, testBuffer.length, blobType);
+                               assertArrayEquals(digest, 
receivedKey1.getHash());
                        }
 
                        // try again with a job-related BLOB:
-                       receivedKey = client.putBuffer(jobId, testBuffer, 0, 
testBuffer.length, blobType);
-                       assertEquals(origKey, receivedKey);
+                       BlobKey receivedKey2 = client.putBuffer(jobId, 
testBuffer, 0, testBuffer.length, blobType);
+                       assertArrayEquals(digest, receivedKey2.getHash());
+                       if (blobType == TRANSIENT_BLOB) {
+                               verifyKeyDifferentHashEquals(receivedKey1, 
receivedKey2);
+                       }
 
                        // Retrieve the data (job-unrelated)
                        if (blobType == TRANSIENT_BLOB) {
-                               validateGetAndClose(client.getInternal(null, 
receivedKey), testBuffer);
+                               validateGetAndClose(client.getInternal(null, 
receivedKey1), testBuffer);
                                // transient BLOBs should be deleted from the 
server, eventually
-                               verifyDeletedEventually(getBlobServer(), null, 
receivedKey);
+                               verifyDeletedEventually(getBlobServer(), null, 
receivedKey1);
                        }
                        // job-related
-                       validateGetAndClose(client.getInternal(jobId, 
receivedKey), testBuffer);
+                       validateGetAndClose(client.getInternal(jobId, 
receivedKey2), testBuffer);
                        if (blobType == TRANSIENT_BLOB) {
                                // transient BLOBs should be deleted from the 
server, eventually
-                               verifyDeletedEventually(getBlobServer(), jobId, 
receivedKey);
+                               verifyDeletedEventually(getBlobServer(), jobId, 
receivedKey2);
                        }
 
                        // Check reaction to invalid keys for job-unrelated 
blobs
@@ -342,41 +344,42 @@ public class BlobClientTest extends TestLogger {
                        throws IOException, InterruptedException {
 
                File testFile = temporaryFolder.newFile();
-               BlobKey origKey = prepareTestFile(testFile, blobType);
+               byte[] digest = prepareTestFile(testFile);
 
                InputStream is = null;
 
                try (BlobClient client = new BlobClient(new 
InetSocketAddress("localhost", getBlobServer().getPort()), 
getBlobClientConfig())) {
 
                        JobID jobId = new JobID();
-                       BlobKey receivedKey;
+                       BlobKey receivedKey1 = null;
 
                        // Store the data (job-unrelated)
                        if (blobType == TRANSIENT_BLOB) {
                                is = new FileInputStream(testFile);
-                               receivedKey = client.putInputStream(null, is, 
blobType);
-                               assertEquals(origKey, receivedKey);
+                               receivedKey1 = client.putInputStream(null, is, 
blobType);
+                               assertArrayEquals(digest, 
receivedKey1.getHash());
                        }
 
                        // try again with a job-related BLOB:
                        is = new FileInputStream(testFile);
-                       receivedKey = client.putInputStream(jobId, is, 
blobType);
-                       assertEquals(origKey, receivedKey);
+                       BlobKey receivedKey2 = client.putInputStream(jobId, is, 
blobType);
 
                        is.close();
                        is = null;
 
                        // Retrieve the data (job-unrelated)
                        if (blobType == TRANSIENT_BLOB) {
-                               validateGetAndClose(client.getInternal(null, 
receivedKey), testFile);
+                               verifyKeyDifferentHashEquals(receivedKey1, 
receivedKey2);
+
+                               validateGetAndClose(client.getInternal(null, 
receivedKey1), testFile);
                                // transient BLOBs should be deleted from the 
server, eventually
-                               verifyDeletedEventually(getBlobServer(), null, 
receivedKey);
+                               verifyDeletedEventually(getBlobServer(), null, 
receivedKey1);
                        }
                        // job-related
-                       validateGetAndClose(client.getInternal(jobId, 
receivedKey), testFile);
+                       validateGetAndClose(client.getInternal(jobId, 
receivedKey2), testFile);
                        if (blobType == TRANSIENT_BLOB) {
                                // transient BLOBs should be deleted from the 
server, eventually
-                               verifyDeletedEventually(getBlobServer(), jobId, 
receivedKey);
+                               verifyDeletedEventually(getBlobServer(), jobId, 
receivedKey2);
                        }
                } finally {
                        if (is != null) {
@@ -463,7 +466,7 @@ public class BlobClientTest extends TestLogger {
        static void uploadJarFile(BlobServer blobServer, Configuration 
blobClientConfig) throws Exception {
                final File testFile = File.createTempFile("testfile", ".dat");
                testFile.deleteOnExit();
-               prepareTestFile(testFile, PERMANENT_BLOB);
+               prepareTestFile(testFile);
 
                InetSocketAddress serverAddress = new 
InetSocketAddress("localhost", blobServer.getPort());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
index 49f4fc2..ae538aa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobKeyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -29,14 +30,17 @@ import java.io.IOException;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.core.IsNot.not;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -46,21 +50,35 @@ public final class BlobKeyTest extends TestLogger {
        /**
         * The first key array to be used during the unit tests.
         */
-       private static final byte[] KEY_ARRAY_1 = new byte[20];
+       private static final byte[] KEY_ARRAY_1 = new byte[BlobKey.SIZE];
 
        /**
         * The second key array to be used during the unit tests.
         */
-       private static final byte[] KEY_ARRAY_2 = new byte[20];
+       private static final byte[] KEY_ARRAY_2 = new byte[BlobKey.SIZE];
+
+       /**
+        * First byte array to use for the random component of a {@link 
BlobKey}.
+        */
+       private static final byte[] RANDOM_ARRAY_1 = new byte[AbstractID.SIZE];
+
+       /**
+        * Second byte array to use for the random component of a {@link 
BlobKey}.
+        */
+       private static final byte[] RANDOM_ARRAY_2 = new byte[AbstractID.SIZE];
 
        /*
-        * Initialize the key array.
+        * Initialize the key and random arrays.
         */
        static {
                for (int i = 0; i < KEY_ARRAY_1.length; ++i) {
                        KEY_ARRAY_1[i] = (byte) i;
                        KEY_ARRAY_2[i] = (byte) (i + 1);
                }
+               for (int i = 0; i < RANDOM_ARRAY_1.length; ++i) {
+                       RANDOM_ARRAY_1[i] = (byte) i;
+                       RANDOM_ARRAY_2[i] = (byte) (i + 1);
+               }
        }
 
        @Test
@@ -89,7 +107,7 @@ public final class BlobKeyTest extends TestLogger {
         * Tests the serialization/deserialization of BLOB keys.
         */
        private void testSerialization(BlobKey.BlobType blobType) throws 
Exception {
-               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
                final BlobKey k2 = CommonTestUtils.createCopySerializable(k1);
                assertEquals(k1, k2);
                assertEquals(k1.hashCode(), k2.hashCode());
@@ -107,16 +125,25 @@ public final class BlobKeyTest extends TestLogger {
        }
 
        /**
-        * Tests the equals method.
+        * Tests the {@link BlobKey#equals(Object)} and {@link 
BlobKey#hashCode()} methods.
         */
        private void testEquals(BlobKey.BlobType blobType) {
-               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-               final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-               final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
+               final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
+               final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, 
RANDOM_ARRAY_1);
+               final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_2);
                assertTrue(k1.equals(k2));
                assertTrue(k2.equals(k1));
+               assertEquals(k1.hashCode(), k2.hashCode());
                assertFalse(k1.equals(k3));
                assertFalse(k3.equals(k1));
+               assertFalse(k1.equals(k4));
+               assertFalse(k4.equals(k1));
+
+               //noinspection ObjectEqualsNull
+               assertFalse(k1.equals(null));
+               //noinspection EqualsBetweenInconvertibleTypes
+               assertFalse(k1.equals(this));
        }
 
        /**
@@ -124,8 +151,8 @@ public final class BlobKeyTest extends TestLogger {
         */
        @Test
        public void testEqualsDifferentBlobType() {
-               final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, 
KEY_ARRAY_1);
-               final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, 
KEY_ARRAY_1);
+               final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, 
KEY_ARRAY_1, RANDOM_ARRAY_1);
+               final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, 
KEY_ARRAY_1, RANDOM_ARRAY_1);
                assertFalse(k1.equals(k2));
                assertFalse(k2.equals(k1));
        }
@@ -144,19 +171,22 @@ public final class BlobKeyTest extends TestLogger {
         * Tests the compares method.
         */
        private void testCompares(BlobKey.BlobType blobType) {
-               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-               final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1);
-               final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2);
+               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
+               final BlobKey k2 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
+               final BlobKey k3 = BlobKey.createKey(blobType, KEY_ARRAY_2, 
RANDOM_ARRAY_1);
+               final BlobKey k4 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_2);
                assertThat(k1.compareTo(k2), is(0));
                assertThat(k2.compareTo(k1), is(0));
                assertThat(k1.compareTo(k3), lessThan(0));
+               assertThat(k1.compareTo(k4), lessThan(0));
                assertThat(k3.compareTo(k1), greaterThan(0));
+               assertThat(k4.compareTo(k1), greaterThan(0));
        }
 
        @Test
        public void testComparesDifferentBlobType() {
-               final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, 
KEY_ARRAY_1);
-               final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, 
KEY_ARRAY_1);
+               final BlobKey k1 = BlobKey.createKey(TRANSIENT_BLOB, 
KEY_ARRAY_1, RANDOM_ARRAY_1);
+               final BlobKey k2 = BlobKey.createKey(PERMANENT_BLOB, 
KEY_ARRAY_1, RANDOM_ARRAY_1);
                assertThat(k1.compareTo(k2), greaterThan(0));
                assertThat(k2.compareTo(k1), lessThan(0));
        }
@@ -175,7 +205,7 @@ public final class BlobKeyTest extends TestLogger {
         * Test the serialization/deserialization using input/output streams.
         */
        private void testStreams(BlobKey.BlobType blobType) throws IOException {
-               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1);
+               final BlobKey k1 = BlobKey.createKey(blobType, KEY_ARRAY_1, 
RANDOM_ARRAY_1);
                final ByteArrayOutputStream baos = new 
ByteArrayOutputStream(20);
 
                k1.writeToOutputStream(baos);
@@ -188,6 +218,28 @@ public final class BlobKeyTest extends TestLogger {
        }
 
        /**
+        * Verifies that the two given key's are different in total but share 
the same hash.
+        *
+        * @param key1 first blob key
+        * @param key2 second blob key
+        */
+       static void verifyKeyDifferentHashEquals(BlobKey key1, BlobKey key2) {
+               assertNotEquals(key1, key2);
+               assertThat(key1.getHash(), equalTo(key2.getHash()));
+       }
+
+       /**
+        * Verifies that the two given key's are different in total and also 
have different hashes.
+        *
+        * @param key1 first blob key
+        * @param key2 second blob key
+        */
+       static void verifyKeyDifferentHashDifferent(BlobKey key1, BlobKey key2) 
{
+               assertNotEquals(key1, key2);
+               assertThat(key1.getHash(), not(equalTo(key2.getHash())));
+       }
+
+       /**
         * Verifies that the given <tt>key</tt> is of an expected type.
         *
         * @param expected the type the key should have

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index a110d4a..fde21ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -47,10 +47,11 @@ import java.util.concurrent.Executors;
 import static 
org.apache.flink.runtime.blob.BlobCacheCleanupTest.checkFileCountForJob;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -86,6 +87,12 @@ public class BlobServerDeleteTest extends TestLogger {
                testDeleteTransient(new JobID(), new JobID());
        }
 
+       @Test
+       public void testDeleteTransient5() throws IOException {
+               JobID jobId = new JobID();
+               testDeleteTransient(jobId, jobId);
+       }
+
        /**
         * Uploads a (different) byte array for each of the given jobs and 
verifies that deleting one of
         * them (via the {@link BlobServer}) does not influence the other.
@@ -97,7 +104,6 @@ public class BlobServerDeleteTest extends TestLogger {
         */
        private void testDeleteTransient(@Nullable JobID jobId1, @Nullable 
JobID jobId2)
                        throws IOException {
-               final boolean sameJobId = (jobId1 == jobId2) || (jobId1 != null 
&& jobId1.equals(jobId2));
 
                final Configuration config = new Configuration();
                config.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
@@ -118,7 +124,7 @@ public class BlobServerDeleteTest extends TestLogger {
                        // put two more BLOBs (same key, other key) for another 
job ID
                        TransientBlobKey key2a = (TransientBlobKey) put(server, 
jobId2, data, TRANSIENT_BLOB);
                        assertNotNull(key2a);
-                       assertEquals(key1, key2a);
+                       verifyKeyDifferentHashEquals(key1, key2a);
                        TransientBlobKey key2b = (TransientBlobKey) put(server, 
jobId2, data2, TRANSIENT_BLOB);
                        assertNotNull(key2b);
 
@@ -126,10 +132,9 @@ public class BlobServerDeleteTest extends TestLogger {
                        assertTrue(delete(server, jobId1, key1));
 
                        verifyDeleted(server, jobId1, key1);
-                       // deleting a one BLOB should not affect another BLOB, 
even with the same key if job IDs are different
-                       if (!sameJobId) {
-                               verifyContents(server, jobId2, key2a, data);
-                       }
+                       // deleting a one BLOB should not affect another BLOB 
with a different key
+                       // (and keys are always different now)
+                       verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
 
                        // delete first file of second job
@@ -284,7 +289,7 @@ public class BlobServerDeleteTest extends TestLogger {
 
                        BlobKey key1a = put(server, jobId1, data, blobType);
                        BlobKey key2 = put(server, jobId2, data, blobType);
-                       assertEquals(key1a, key2);
+                       assertArrayEquals(key1a.getHash(), key2.getHash());
 
                        BlobKey key1b = put(server, jobId1, data2, blobType);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 4927279..e3b5309 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -42,7 +42,6 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.NoSuchFileException;
-import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -58,6 +57,7 @@ import java.util.concurrent.Executors;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobUtils.JOB_DIR_PREFIX;
 import static org.junit.Assert.assertArrayEquals;
@@ -141,18 +141,18 @@ public class BlobServerGetTest extends TestLogger {
 
                        // add the same data under a second jobId
                        BlobKey key2 = put(server, jobId2, data, blobType);
-                       assertNotNull(key);
-                       assertEquals(key, key2);
+                       assertNotNull(key2);
+                       verifyKeyDifferentHashEquals(key, key2);
 
                        // request for jobId2 should succeed
-                       get(server, jobId2, key);
+                       get(server, jobId2, key2);
                        // request for jobId1 should still fail
                        verifyDeleted(server, jobId1, key);
 
                        // same checks as for jobId1 but for jobId2 should also 
work:
-                       blobFile = server.getStorageLocation(jobId2, key);
+                       blobFile = server.getStorageLocation(jobId2, key2);
                        assertTrue(blobFile.delete());
-                       verifyDeleted(server, jobId2, key);
+                       verifyDeleted(server, jobId2, key2);
                }
        }
 
@@ -373,11 +373,6 @@ public class BlobServerGetTest extends TestLogger {
 
                final byte[] data = {1, 2, 3, 4, 99, 42};
 
-               MessageDigest md = BlobUtils.createMessageDigest();
-
-               // create the correct blob key by hashing our input data
-               final BlobKey blobKey = BlobKey.createKey(blobType, 
md.digest(data));
-
                doAnswer(
                        new Answer() {
                                @Override
@@ -398,7 +393,7 @@ public class BlobServerGetTest extends TestLogger {
                        server.start();
 
                        // upload data first
-                       assertEquals(blobKey, put(server, jobId, data, 
blobType));
+                       final BlobKey blobKey = put(server, jobId, data, 
blobType);
 
                        // now try accessing it concurrently (only HA mode will 
be able to retrieve it from HA store!)
                        if (blobType == PERMANENT_BLOB) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index aefd0a3..dcf49a5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -62,6 +62,7 @@ import java.util.concurrent.Executors;
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.get;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -226,21 +227,26 @@ public class BlobServerPutTest extends TestLogger {
                        // put data for jobId1 and verify
                        BlobKey key1a = put(server, jobId1, data, blobType);
                        assertNotNull(key1a);
+                       // second upload of same data should yield a different 
BlobKey
+                       BlobKey key1a2 = put(server, jobId1, data, blobType);
+                       assertNotNull(key1a2);
+                       verifyKeyDifferentHashEquals(key1a, key1a2);
 
                        BlobKey key1b = put(server, jobId1, data2, blobType);
                        assertNotNull(key1b);
 
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
 
                        // now put data for jobId2 and verify that both are ok
                        BlobKey key2a = put(server, jobId2, data, blobType);
                        assertNotNull(key2a);
-                       assertEquals(key1a, key2a);
+                       verifyKeyDifferentHashEquals(key1a, key2a);
 
                        BlobKey key2b = put(server, jobId2, data2, blobType);
                        assertNotNull(key2b);
-                       assertEquals(key1b, key2b);
+                       verifyKeyDifferentHashEquals(key1b, key2b);
 
                        // verify the accessibility and the BLOB contents
                        verifyContents(server, jobId2, key2a, data);
@@ -249,6 +255,7 @@ public class BlobServerPutTest extends TestLogger {
                        // verify the accessibility and the BLOB contents one 
more time (transient BLOBs should
                        // not be deleted here)
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
                        verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
@@ -311,21 +318,26 @@ public class BlobServerPutTest extends TestLogger {
                        // put data for jobId1 and verify
                        BlobKey key1a = put(server, jobId1, new 
ByteArrayInputStream(data), blobType);
                        assertNotNull(key1a);
+                       // second upload of same data should yield a different 
BlobKey
+                       BlobKey key1a2 = put(server, jobId1, new 
ByteArrayInputStream(data), blobType);
+                       assertNotNull(key1a2);
+                       verifyKeyDifferentHashEquals(key1a, key1a2);
 
                        BlobKey key1b = put(server, jobId1, new 
ByteArrayInputStream(data2), blobType);
                        assertNotNull(key1b);
 
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
 
                        // now put data for jobId2 and verify that both are ok
                        BlobKey key2a = put(server, jobId2, new 
ByteArrayInputStream(data), blobType);
                        assertNotNull(key2a);
-                       assertEquals(key1a, key2a);
+                       verifyKeyDifferentHashEquals(key1a, key2a);
 
                        BlobKey key2b = put(server, jobId2, new 
ByteArrayInputStream(data2), blobType);
                        assertNotNull(key2b);
-                       assertEquals(key1b, key2b);
+                       verifyKeyDifferentHashEquals(key1b, key2b);
 
                        // verify the accessibility and the BLOB contents
                        verifyContents(server, jobId2, key2a, data);
@@ -334,6 +346,7 @@ public class BlobServerPutTest extends TestLogger {
                        // verify the accessibility and the BLOB contents one 
more time (transient BLOBs should
                        // not be deleted here)
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
                        verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
@@ -396,21 +409,26 @@ public class BlobServerPutTest extends TestLogger {
                        // put data for jobId1 and verify
                        BlobKey key1a = put(server, jobId1, new 
ChunkedInputStream(data, 19), blobType);
                        assertNotNull(key1a);
+                       // second upload of same data should yield a different 
BlobKey
+                       BlobKey key1a2 = put(server, jobId1, new 
ChunkedInputStream(data, 19), blobType);
+                       assertNotNull(key1a2);
+                       verifyKeyDifferentHashEquals(key1a, key1a2);
 
                        BlobKey key1b = put(server, jobId1, new 
ChunkedInputStream(data2, 19), blobType);
                        assertNotNull(key1b);
 
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
 
                        // now put data for jobId2 and verify that both are ok
                        BlobKey key2a = put(server, jobId2, new 
ChunkedInputStream(data, 19), blobType);
                        assertNotNull(key2a);
-                       assertEquals(key1a, key2a);
+                       verifyKeyDifferentHashEquals(key1a, key2a);
 
                        BlobKey key2b = put(server, jobId2, new 
ChunkedInputStream(data2, 19), blobType);
                        assertNotNull(key2b);
-                       assertEquals(key1b, key2b);
+                       verifyKeyDifferentHashEquals(key1b, key2b);
 
                        // verify the accessibility and the BLOB contents
                        verifyContents(server, jobId2, key2a, data);
@@ -419,6 +437,7 @@ public class BlobServerPutTest extends TestLogger {
                        // verify the accessibility and the BLOB contents one 
more time (transient BLOBs should
                        // not be deleted here)
                        verifyContents(server, jobId1, key1a, data);
+                       verifyContents(server, jobId1, key1a2, data);
                        verifyContents(server, jobId1, key1b, data2);
                        verifyContents(server, jobId2, key2a, data);
                        verifyContents(server, jobId2, key2b, data2);
@@ -700,7 +719,7 @@ public class BlobServerPutTest extends TestLogger {
 
                        // make sure that all blob keys are the same
                        while (blobKeyIterator.hasNext()) {
-                               assertEquals(blobKey, blobKeyIterator.next());
+                               verifyKeyDifferentHashEquals(blobKey, 
blobKeyIterator.next());
                        }
 
                        // check the uploaded file's contents

http://git-wip-us.apache.org/repos/asf/flink/blob/f853f335/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
index 35575b5..5b8d0e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerRecoveryTest.java
@@ -39,12 +39,10 @@ import java.util.Random;
 
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB;
 import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB;
+import static 
org.apache.flink.runtime.blob.BlobKeyTest.verifyKeyDifferentHashEquals;
 import static org.apache.flink.runtime.blob.BlobServerGetTest.verifyDeleted;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.put;
 import static org.apache.flink.runtime.blob.BlobServerPutTest.verifyContents;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -126,8 +124,7 @@ public class BlobServerRecoveryTest extends TestLogger {
 
                        // put non-HA data
                        nonHAKey = put(server0, jobId[0], expected2, 
TRANSIENT_BLOB);
-                       assertNotEquals(keys[1], nonHAKey);
-                       assertThat(keys[1].getHash(), 
equalTo(nonHAKey.getHash()));
+                       verifyKeyDifferentHashEquals(keys[1], nonHAKey);
 
                        // check that the storage directory exists
                        final Path blobServerPath = new Path(storagePath, 
"blob");

Reply via email to