[FLINK-6020] Introduce BlobServer#readWriteLock to synchronize file creation 
and deletion

This commit introduces a BlobServer#readWriteLock in order to synchronize file 
creation
and deletion operations in BlobServerConnection and BlobServer. This will 
prevent
that multiple put and get operations interfere with each other and with get 
operations.

The get operations are synchronized using the read lock in order to guarantee 
some kind of
parallelism.

Add Get and Delete operation tests

This closes #3888.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ad489d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ad489d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ad489d8

Branch: refs/heads/master
Commit: 7ad489d87281b74c53d3b1a0dd97e56b7a8ef303
Parents: 88b0f2a
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed May 10 17:38:49 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed May 17 08:19:05 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/blob/BlobClient.java   |   2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  29 ++-
 .../runtime/blob/BlobServerConnection.java      | 240 +++++++++++++++----
 .../runtime/blob/BlobServerDeleteTest.java      |  73 +++++-
 .../flink/runtime/blob/BlobServerGetTest.java   | 115 ++++++++-
 .../flink/runtime/blob/BlobServerPutTest.java   | 109 ++++++++-
 .../src/test/resources/log4j-test.properties    |   2 +-
 7 files changed, 509 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
index 49e54a1..fab3c5c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java
@@ -537,7 +537,7 @@ public final class BlobClient implements Closeable {
                        throw new IOException("Server side error: " + 
cause.getMessage(), cause);
                }
                else {
-                       throw new IOException("Unrecognized response");
+                       throw new IOException("Unrecognized response: " + 
response + '.');
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 937eab0..5ad4b6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -44,6 +44,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -85,6 +87,9 @@ public class BlobServer extends Thread implements BlobService 
{
        /** The maximum number of concurrent connections */
        private final int maxConnections;
 
+       /** Lock guarding concurrent file accesses */
+       private final ReadWriteLock readWriteLock;
+
        /**
         * Shutdown hook thread to ensure deletion of the storage directory (or 
<code>null</code> if
         * the configured high availability mode does not equal{@link 
HighAvailabilityMode#NONE})
@@ -104,6 +109,7 @@ public class BlobServer extends Thread implements 
BlobService {
        public BlobServer(Configuration config, BlobStore blobStore) throws 
IOException {
                this.blobServiceConfiguration = checkNotNull(config);
                this.blobStore = checkNotNull(blobStore);
+               this.readWriteLock = new ReentrantReadWriteLock();
 
                // configure and create the storage directory
                String storageDirectory = 
config.getString(BlobServerOptions.STORAGE_DIRECTORY);
@@ -235,6 +241,13 @@ public class BlobServer extends Thread implements 
BlobService {
                return blobStore;
        }
 
+       /**
+        * Returns the lock used to guard file accesses
+        */
+       public ReadWriteLock getReadWriteLock() {
+               return readWriteLock;
+       }
+
        @Override
        public void run() {
                try {
@@ -395,13 +408,19 @@ public class BlobServer extends Thread implements 
BlobService {
        public void delete(BlobKey key) throws IOException {
                final File localFile = BlobUtils.getStorageLocation(storageDir, 
key);
 
-               if (localFile.exists()) {
-                       if (!localFile.delete()) {
-                               LOG.warn("Failed to delete locally BLOB " + key 
+ " at " + localFile.getAbsolutePath());
+               readWriteLock.writeLock().lock();
+
+               try {
+                       if (localFile.exists()) {
+                               if (!localFile.delete()) {
+                                       LOG.warn("Failed to delete locally BLOB 
" + key + " at " + localFile.getAbsolutePath());
+                               }
                        }
-               }
 
-               blobStore.delete(key);
+                       blobStore.delete(key);
+               } finally {
+                       readWriteLock.writeLock().unlock();
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
index 13a90c6..a76dbd5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.blob;
 
-import com.google.common.io.Files;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
@@ -33,7 +32,11 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
 import java.security.MessageDigest;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE;
 import static 
org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE;
@@ -67,6 +70,12 @@ class BlobServerConnection extends Thread {
        /** The HA blob store. */
        private final BlobStore blobStore;
 
+       /** Write lock to synchronize file accesses */
+       private final Lock writeLock;
+
+       /** Read lock to synchronize file accesses */
+       private final Lock readLock;
+
        /**
         * Creates a new BLOB connection for a client request
         * 
@@ -74,7 +83,7 @@ class BlobServerConnection extends Thread {
         * @param blobServer The BLOB server.
         */
        BlobServerConnection(Socket clientSocket, BlobServer blobServer) {
-               super("BLOB connection for " + 
clientSocket.getRemoteSocketAddress().toString());
+               super("BLOB connection for " + 
clientSocket.getRemoteSocketAddress());
                setDaemon(true);
 
                if (blobServer == null) {
@@ -84,6 +93,11 @@ class BlobServerConnection extends Thread {
                this.clientSocket = clientSocket;
                this.blobServer = blobServer;
                this.blobStore = blobServer.getBlobStore();
+
+               ReadWriteLock readWriteLock = blobServer.getReadWriteLock();
+
+               this.writeLock = readWriteLock.writeLock();
+               this.readLock = readWriteLock.readLock();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -178,8 +192,13 @@ class BlobServerConnection extends Thread {
                 */
 
                File blobFile;
+               int contentAddressable = -1;
+               JobID jobId = null;
+               String key = null;
+               BlobKey blobKey = null;
+
                try {
-                       final int contentAddressable = inputStream.read();
+                       contentAddressable = inputStream.read();
 
                        if (contentAddressable < 0) {
                                throw new EOFException("Premature end of GET 
request");
@@ -189,37 +208,18 @@ class BlobServerConnection extends Thread {
                                byte[] jidBytes = new byte[JobID.SIZE];
                                readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
 
-                               JobID jobID = JobID.fromByteArray(jidBytes);
-                               String key = readKey(buf, inputStream);
-                               blobFile = 
this.blobServer.getStorageLocation(jobID, key);
-
-                               if (!blobFile.exists()) {
-                                       blobStore.get(jobID, key, blobFile);
-                               }
+                               jobId = JobID.fromByteArray(jidBytes);
+                               key = readKey(buf, inputStream);
+                               blobFile = blobServer.getStorageLocation(jobId, 
key);
                        }
                        else if (contentAddressable == CONTENT_ADDRESSABLE) {
-                               final BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-                               blobFile = blobServer.getStorageLocation(key);
-
-                               if (!blobFile.exists()) {
-                                       blobStore.get(key, blobFile);
-                               }
+                               blobKey = 
BlobKey.readFromInputStream(inputStream);
+                               blobFile = 
blobServer.getStorageLocation(blobKey);
                        }
                        else {
-                               throw new IOException("Unknown type of BLOB 
addressing.");
-                       }
-
-                       // Check if BLOB exists
-                       if (!blobFile.exists()) {
-                               throw new IOException("Cannot find required 
BLOB at " + blobFile.getAbsolutePath());
-                       }
-
-                       if (blobFile.length() > Integer.MAX_VALUE) {
-                               throw new IOException("BLOB size exceeds the 
maximum size (2 GB).");
+                               throw new IOException("Unknown type of BLOB 
addressing: " + contentAddressable + '.');
                        }
 
-                       outputStream.write(RETURN_OKAY);
-
                        // up to here, an error can give a good message
                }
                catch (Throwable t) {
@@ -235,8 +235,58 @@ class BlobServerConnection extends Thread {
                        return;
                }
 
-               // from here on, we started sending data, so all we can do is 
close the connection when something happens
+               readLock.lock();
+
                try {
+                       try {
+                               if (!blobFile.exists()) {
+                                       // first we have to release the read 
lock in order to acquire the write lock
+                                       readLock.unlock();
+                                       writeLock.lock();
+
+                                       try {
+                                               if (blobFile.exists()) {
+                                                       LOG.debug("Blob file {} 
has downloaded from the BlobStore by a different connection.", blobFile);
+                                               } else {
+                                                       if (contentAddressable 
== NAME_ADDRESSABLE) {
+                                                               
blobStore.get(jobId, key, blobFile);
+                                                       } else if 
(contentAddressable == CONTENT_ADDRESSABLE) {
+                                                               
blobStore.get(blobKey, blobFile);
+                                                       } else {
+                                                               throw new 
IOException("Unknown type of BLOB addressing: " + contentAddressable + '.');
+                                                       }
+                                               }
+                                       } finally {
+                                               writeLock.unlock();
+                                       }
+
+                                       readLock.lock();
+
+                                       // Check if BLOB exists
+                                       if (!blobFile.exists()) {
+                                               throw new IOException("Cannot 
find required BLOB at " + blobFile.getAbsolutePath());
+                                       }
+                               }
+
+                               if (blobFile.length() > Integer.MAX_VALUE) {
+                                       throw new IOException("BLOB size 
exceeds the maximum size (2 GB).");
+                               }
+
+                               outputStream.write(RETURN_OKAY);
+                       } catch (Throwable t) {
+                               LOG.error("GET operation failed", t);
+                               try {
+                                       writeErrorToStream(outputStream, t);
+                               }
+                               catch (IOException e) {
+                                       // since we are in an exception case, 
it means not much that we could not send the error
+                                       // ignore this
+                               }
+                               clientSocket.close();
+                               return;
+                       }
+
+                       // from here on, we started sending data, so all we can 
do is close the connection when something happens
                        int blobLen = (int) blobFile.length();
                        writeLength(blobLen, outputStream);
 
@@ -251,14 +301,14 @@ class BlobServerConnection extends Thread {
                                        bytesRemaining -= read;
                                }
                        }
-               }
-               catch (SocketException e) {
+               } catch (SocketException e) {
                        // happens when the other side disconnects
                        LOG.debug("Socket connection closed", e);
-               }
-               catch (Throwable t) {
+               } catch (Throwable t) {
                        LOG.error("GET operation failed", t);
                        clientSocket.close();
+               } finally {
+                       readLock.unlock();
                }
        }
 
@@ -328,21 +378,83 @@ class BlobServerConnection extends Thread {
                        fos.close();
 
                        if (contentAddressable == NAME_ADDRESSABLE) {
-                               File storageFile = 
this.blobServer.getStorageLocation(jobID, key);
-                               Files.move(incomingFile, storageFile);
-                               incomingFile = null;
+                               File storageFile = 
blobServer.getStorageLocation(jobID, key);
 
-                               blobStore.put(storageFile, jobID, key);
+                               writeLock.lock();
+
+                               try {
+                                       // first check whether the file already 
exists
+                                       if (!storageFile.exists()) {
+                                               try {
+                                                       // only move the file 
if it does not yet exist
+                                                       
Files.move(incomingFile.toPath(), storageFile.toPath());
+
+                                                       incomingFile = null;
+
+                                               } catch 
(FileAlreadyExistsException ignored) {
+                                                       LOG.warn("Detected 
concurrent file modifications. This should only happen if multiple" +
+                                                               "BlobServer use 
the same storage directory.");
+                                                       // we cannot be sure at 
this point whether the file has already been uploaded to the blob
+                                                       // store or not. Even 
if the blobStore might shortly be in an inconsistent state, we have
+                                                       // persist the blob. 
Otherwise we might not be able to recover the job.
+                                               }
+
+                                               // only the one moving the 
incoming file to its final destination is allowed to upload the
+                                               // file to the blob store
+                                               blobStore.put(storageFile, 
jobID, key);
+                                       }
+                               } catch(IOException ioe) {
+                                       // we failed to either create the local 
storage file or to upload it --> try to delete the local file
+                                       // while still having the write lock
+                                       if (storageFile.exists() && 
!storageFile.delete()) {
+                                               LOG.warn("Could not delete the 
storage file.");
+                                       }
+
+                                       throw ioe;
+                               } finally {
+                                       writeLock.unlock();
+                               }
 
                                outputStream.write(RETURN_OKAY);
                        }
                        else {
                                BlobKey blobKey = new BlobKey(md.digest());
                                File storageFile = 
blobServer.getStorageLocation(blobKey);
-                               Files.move(incomingFile, storageFile);
-                               incomingFile = null;
 
-                               blobStore.put(storageFile, blobKey);
+                               writeLock.lock();
+
+                               try {
+                                       // first check whether the file already 
exists
+                                       if (!storageFile.exists()) {
+                                               try {
+                                                       // only move the file 
if it does not yet exist
+                                                       
Files.move(incomingFile.toPath(), storageFile.toPath());
+
+                                                       incomingFile = null;
+
+                                               } catch 
(FileAlreadyExistsException ignored) {
+                                                       LOG.warn("Detected 
concurrent file modifications. This should only happen if multiple" +
+                                                               "BlobServer use 
the same storage directory.");
+                                                       // we cannot be sure at 
this point whether the file has already been uploaded to the blob
+                                                       // store or not. Even 
if the blobStore might shortly be in an inconsistent state, we have
+                                                       // persist the blob. 
Otherwise we might not be able to recover the job.
+                                               }
+
+                                               // only the one moving the 
incoming file to its final destination is allowed to upload the
+                                               // file to the blob store
+                                               blobStore.put(storageFile, 
blobKey);
+                                       }
+                               } catch(IOException ioe) {
+                                       // we failed to either create the local 
storage file or to upload it --> try to delete the local file
+                                       // while still having the write lock
+                                       if (storageFile.exists() && 
!storageFile.delete()) {
+                                               LOG.warn("Could not delete the 
storage file.");
+                                       }
+
+                                       throw ioe;
+                               } finally {
+                                       writeLock.unlock();
+                               }
 
                                // Return computed key to client for validation
                                outputStream.write(RETURN_OKAY);
@@ -397,12 +509,21 @@ class BlobServerConnection extends Thread {
 
                        if (type == CONTENT_ADDRESSABLE) {
                                BlobKey key = 
BlobKey.readFromInputStream(inputStream);
-                               File blobFile = 
this.blobServer.getStorageLocation(key);
-                               if (blobFile.exists() && !blobFile.delete()) {
-                                       throw new IOException("Cannot delete 
BLOB file " + blobFile.getAbsolutePath());
-                               }
+                               File blobFile = 
blobServer.getStorageLocation(key);
+
+                               writeLock.lock();
 
-                               blobStore.delete(key);
+                               try {
+                                       // we should make the local and remote 
file deletion atomic, otherwise we might risk not
+                                       // removing the remote file in case of 
a concurrent put operation
+                                       if (blobFile.exists() && 
!blobFile.delete()) {
+                                               throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
+                                       }
+
+                                       blobStore.delete(key);
+                               } finally {
+                                       writeLock.unlock();
+                               }
                        }
                        else if (type == NAME_ADDRESSABLE) {
                                byte[] jidBytes = new byte[JobID.SIZE];
@@ -412,20 +533,37 @@ class BlobServerConnection extends Thread {
                                String key = readKey(buf, inputStream);
 
                                File blobFile = 
this.blobServer.getStorageLocation(jobID, key);
-                               if (blobFile.exists() && !blobFile.delete()) {
-                                       throw new IOException("Cannot delete 
BLOB file " + blobFile.getAbsolutePath());
-                               }
 
-                               blobStore.delete(jobID, key);
+                               writeLock.lock();
+
+                               try {
+                                       // we should make the local and remote 
file deletion atomic, otherwise we might risk not
+                                       // removing the remote file in case of 
a concurrent put operation
+                                       if (blobFile.exists() && 
!blobFile.delete()) {
+                                               throw new IOException("Cannot 
delete BLOB file " + blobFile.getAbsolutePath());
+                                       }
+
+                                       blobStore.delete(jobID, key);
+                               } finally {
+                                       writeLock.unlock();
+                               }
                        }
                        else if (type == JOB_ID_SCOPE) {
                                byte[] jidBytes = new byte[JobID.SIZE];
                                readFully(inputStream, jidBytes, 0, JobID.SIZE, 
"JobID");
                                JobID jobID = JobID.fromByteArray(jidBytes);
 
-                               blobServer.deleteJobDirectory(jobID);
+                               writeLock.lock();
+
+                               try {
+                                       // we should make the local and remote 
file deletion atomic, otherwise we might risk not
+                                       // removing the remote file in case of 
a concurrent put operation
+                                       blobServer.deleteJobDirectory(jobID);
 
-                               blobStore.deleteAll(jobID);
+                                       blobStore.deleteAll(jobID);
+                               } finally {
+                                       writeLock.unlock();
+                               }
                        }
                        else {
                                throw new IOException("Unrecognized addressing 
type: " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index e8e28a1..5e1d86e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -20,23 +20,35 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests how DELETE requests behave.
  */
-public class BlobServerDeleteTest {
+public class BlobServerDeleteTest extends TestLogger {
 
        private final Random rnd = new Random();
 
@@ -285,6 +297,65 @@ public class BlobServerDeleteTest {
                }
        }
 
+       /**
+        * FLINK-6020
+        *
+        * Tests that concurrent delete operations don't interfere with each 
other.
+        *
+        * Note: The test checks that there cannot be two threads which have 
checked whether a given blob file exist
+        * and then one of them fails deleting it. Without the introduced lock, 
this situation should rarely happen
+        * and make this test fail. Thus, if this test should become 
"unstable", then the delete atomicity is most likely
+        * broken.
+        */
+       @Test
+       public void testConcurrentDeleteOperations() throws IOException, 
ExecutionException, InterruptedException {
+               final Configuration configuration = new Configuration();
+               final BlobStore blobStore = mock(BlobStore.class);
+
+               final int concurrentDeleteOperations = 3;
+               final ExecutorService executor = 
Executors.newFixedThreadPool(concurrentDeleteOperations);
+
+               final List<Future<Void>> deleteFutures = new 
ArrayList<>(concurrentDeleteOperations);
+
+               final byte[] data = {1, 2, 3};
+
+               try (final BlobServer blobServer = new 
BlobServer(configuration, blobStore)) {
+
+                       final BlobKey blobKey;
+
+                       try (BlobClient client = blobServer.createClient()) {
+                               blobKey = client.put(data);
+                       }
+
+                       
assertTrue(blobServer.getStorageLocation(blobKey).exists());
+
+                       for (int i = 0; i < concurrentDeleteOperations; i++) {
+                               Future<Void> deleteFuture = 
FlinkCompletableFuture.supplyAsync(new Callable<Void>() {
+                                       @Override
+                                       public Void call() throws Exception {
+                                               try (BlobClient blobClient = 
blobServer.createClient()) {
+                                                       
blobClient.delete(blobKey);
+                                               }
+
+                                               return null;
+                                       }
+                               }, executor);
+
+                               deleteFutures.add(deleteFuture);
+                       }
+
+                       Future<Void> waitFuture = 
FutureUtils.waitForAll(deleteFutures);
+
+                       // make sure all delete operation have completed 
successfully
+                       // in case of no lock, one of the delete operations 
should eventually fail
+                       waitFuture.get();
+
+                       
assertFalse(blobServer.getStorageLocation(blobKey).exists());
+               } finally {
+                       executor.shutdownNow();
+               }
+       }
+
        private void cleanup(BlobServer server, BlobClient client) {
                if (client != null) {
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 6d1dba8..3209648 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -18,27 +18,57 @@
 
 package org.apache.flink.runtime.blob;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests how failing GET requests behave in the presence of failures.
  * Successful GET requests are tested in conjunction wit the PUT
  * requests.
  */
-public class BlobServerGetTest {
+public class BlobServerGetTest extends TestLogger {
 
        private final Random rnd = new Random();
 
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
        @Test
        public void testGetFailsDuringLookup() throws IOException {
                BlobServer server = null;
@@ -128,4 +158,87 @@ public class BlobServerGetTest {
                        }
                }
        }
+
+       /**
+        * FLINK-6020
+        *
+        * Tests that concurrent get operations don't concurrently access the 
BlobStore to download a blob.
+        */
+       @Test
+       public void testConcurrentGetOperations() throws IOException, 
ExecutionException, InterruptedException {
+               final Configuration configuration = new Configuration();
+
+               configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, 
temporaryFolder.newFolder().getAbsolutePath());
+
+               final BlobStore blobStore = mock(BlobStore.class);
+
+               final int numberConcurrentGetOperations = 3;
+               final List<Future<InputStream>> getOperations = new 
ArrayList<>(numberConcurrentGetOperations);
+
+               final byte[] data = {1, 2, 3, 4, 99, 42};
+               final ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+
+               MessageDigest md = BlobUtils.createMessageDigest();
+
+               // create the correct blob key by hashing our input data
+               final BlobKey blobKey = new BlobKey(md.digest(data));
+
+               doAnswer(
+                       new Answer() {
+                               @Override
+                               public Object answer(InvocationOnMock 
invocation) throws Throwable {
+                                       File targetFile = (File) 
invocation.getArguments()[1];
+
+                                       FileUtils.copyInputStreamToFile(bais, 
targetFile);
+
+                                       return null;
+                               }
+                       }
+               ).when(blobStore).get(any(BlobKey.class), any(File.class));
+
+               final ExecutorService executor = 
Executors.newFixedThreadPool(numberConcurrentGetOperations);
+
+               try (final BlobServer blobServer = new 
BlobServer(configuration, blobStore)) {
+                       for (int i = 0; i < numberConcurrentGetOperations; i++) 
{
+                               Future<InputStream> getOperation = 
FlinkCompletableFuture.supplyAsync(new Callable<InputStream>() {
+                                       @Override
+                                       public InputStream call() throws 
Exception {
+                                               try (BlobClient blobClient = 
blobServer.createClient();
+                                                        InputStream 
inputStream = blobClient.get(blobKey)) {
+                                                       byte[] buffer = new 
byte[data.length];
+
+                                                       
IOUtils.readFully(inputStream, buffer);
+
+                                                       return new 
ByteArrayInputStream(buffer);
+                                               }
+                                       }
+                               }, executor);
+
+                               getOperations.add(getOperation);
+                       }
+
+                       Future<Collection<InputStream>> inputStreamsFuture = 
FutureUtils.combineAll(getOperations);
+
+                       Collection<InputStream> inputStreams = 
inputStreamsFuture.get();
+
+                       // check that we have read the right data
+                       for (InputStream inputStream : inputStreams) {
+                               ByteArrayOutputStream baos = new 
ByteArrayOutputStream(data.length);
+
+                               IOUtils.copy(inputStream, baos);
+
+                               baos.close();
+                               byte[] input = baos.toByteArray();
+
+                               assertArrayEquals(data, input);
+
+                               inputStream.close();
+                       }
+
+                       // verify that we downloaded the requested blob exactly 
once from the BlobStore
+                       verify(blobStore, times(1)).get(eq(blobKey), 
any(File.class));
+               } finally {
+                       executor.shutdownNow();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 441ca7d..35ef968 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -28,16 +33,29 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for successful and failing PUT operations against the BLOB server,
  * and successful GET operations.
  */
-public class BlobServerPutTest {
+public class BlobServerPutTest extends TestLogger {
 
        private final Random rnd = new Random();
 
@@ -299,6 +317,95 @@ public class BlobServerPutTest {
                }
        }
 
+       /**
+        * FLINK-6020
+        *
+        * Tests that concurrent put operations will only upload the file once 
to the {@link BlobStore}.
+        */
+       @Test
+       public void testConcurrentPutOperations() throws IOException, 
ExecutionException, InterruptedException {
+               final Configuration configuration = new Configuration();
+               BlobStore blobStore = mock(BlobStore.class);
+               int concurrentPutOperations = 2;
+               int dataSize = 1024;
+
+               final CountDownLatch countDownLatch = new 
CountDownLatch(concurrentPutOperations);
+               final byte[] data = new byte[dataSize];
+
+               ArrayList<Future<BlobKey>> allFutures = new 
ArrayList(concurrentPutOperations);
+
+               ExecutorService executor = 
Executors.newFixedThreadPool(concurrentPutOperations);
+
+               try (
+                       final BlobServer blobServer = new 
BlobServer(configuration, blobStore)) {
+
+                       for (int i = 0; i < concurrentPutOperations; i++) {
+                               Future<BlobKey> putFuture = 
FlinkCompletableFuture.supplyAsync(new Callable<BlobKey>() {
+                                       @Override
+                                       public BlobKey call() throws Exception {
+                                               try (BlobClient blobClient = 
blobServer.createClient()) {
+                                                       return 
blobClient.put(new BlockingInputStream(countDownLatch, data));
+                                               }
+                                       }
+                               }, executor);
+
+                               allFutures.add(putFuture);
+                       }
+
+                       FutureUtils.ConjunctFuture<Collection<BlobKey>> 
conjunctFuture = FutureUtils.combineAll(allFutures);
+
+                       // wait until all operations have completed and check 
that no exception was thrown
+                       Collection<BlobKey> blobKeys = conjunctFuture.get();
+
+                       Iterator<BlobKey> blobKeyIterator = blobKeys.iterator();
+
+                       assertTrue(blobKeyIterator.hasNext());
+
+                       BlobKey blobKey = blobKeyIterator.next();
+
+                       // make sure that all blob keys are the same
+                       while(blobKeyIterator.hasNext()) {
+                               assertEquals(blobKey, blobKeyIterator.next());
+                       }
+
+                       // check that we only uploaded the file once to the 
blob store
+                       verify(blobStore, times(1)).put(any(File.class), 
eq(blobKey));
+               } finally {
+                       executor.shutdownNow();
+               }
+       }
+
+       private static final class BlockingInputStream extends InputStream {
+
+               private final CountDownLatch countDownLatch;
+               private final byte[] data;
+               private int index = 0;
+
+               public BlockingInputStream(CountDownLatch countDownLatch, 
byte[] data) {
+                       this.countDownLatch = 
Preconditions.checkNotNull(countDownLatch);
+                       this.data = Preconditions.checkNotNull(data);
+               }
+
+               @Override
+               public int read() throws IOException {
+
+                       countDownLatch.countDown();
+
+                       try {
+                               countDownLatch.await();
+                       } catch (InterruptedException e) {
+                               Thread.currentThread().interrupt();
+                               throw new IOException("Blocking operation was 
interrupted.", e);
+                       }
+
+                       if (index >= data.length) {
+                               return -1;
+                       } else {
+                               return data[index++];
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        private static final class ChunkedInputStream extends InputStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/7ad489d8/flink-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/resources/log4j-test.properties 
b/flink-runtime/src/test/resources/log4j-test.properties
index 98f136a..7ba1633 100644
--- a/flink-runtime/src/test/resources/log4j-test.properties
+++ b/flink-runtime/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, console
+log4j.rootLogger=OFF, console
 
 # -----------------------------------------------------------------------------
 # Console (use 'console')

Reply via email to