This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c22d01d  [FLINK-17463][tests] Avoid concurrent directory creation and 
deletion
c22d01d is described below

commit c22d01d3bfbb1384f98664361f1491b806e95798
Author: Gary Yao <[email protected]>
AuthorDate: Wed May 27 15:23:47 2020 +0200

    [FLINK-17463][tests] Avoid concurrent directory creation and deletion
    
    BlobCacheCleanupTest#testPermanentBlobCleanup() tests that job related
    files are cleaned up by a background task when the job is released from
    the PermanentBlobCache. The tests asserts that the uploaded blobs are
    deleted from the filesystem. Because the scheduling of the background
    task cannot be controlled from outside the cache, the test polls the
    filesystem. More precisely, the test uses BlobUtils#getStorageLocation()
    to build the path on the filesystem given a blobkey and tests the
    existence of that path in regular intervals. As a side effect, however,
    BlobUtils#getStorageLocation() also creates all necessary directories to
    that path if they do not exist yet. This leads to a situation where
    directories and concurrently deleted and created, which can cause
    FileAlreadyExists exceptions. This commit fixes the issue.
    
    Note that the above applies to all tests that invoke
    BlobServerCleanupTest#checkFilesExist().
    
    This closes #12376.
---
 .../java/org/apache/flink/runtime/blob/AbstractBlobCache.java  |  4 ++++
 .../main/java/org/apache/flink/runtime/blob/BlobServer.java    |  4 ++++
 .../org/apache/flink/runtime/blob/BlobServerCleanupTest.java   | 10 ++++++----
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index ce12898..8a873f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -122,6 +122,10 @@ public abstract class AbstractBlobCache implements 
Closeable {
                this.serverAddress = serverAddress;
        }
 
+       public File getStorageDir() {
+               return storageDir;
+       }
+
        /**
         * Returns local copy of the file for the BLOB with the given key.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index a47040c..a50f535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -213,6 +213,10 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
        //  Path Accessors
        // 
--------------------------------------------------------------------------------------------
 
+       public File getStorageDir() {
+               return storageDir;
+       }
+
        /**
         * Returns a file handle to the file associated with the given blob key 
on the blob
         * server.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index aafba30..04c1187 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -204,20 +204,22 @@ public class BlobServerCleanupTest extends TestLogger {
                int numFiles = 0;
 
                for (BlobKey key : keys) {
-                       final File blobFile;
+                       final File storageDir;
                        if (blobService instanceof BlobServer) {
                                BlobServer server = (BlobServer) blobService;
-                               blobFile = server.getStorageLocation(jobId, 
key);
+                               storageDir = server.getStorageDir();
                        } else if (blobService instanceof PermanentBlobCache) {
                                PermanentBlobCache cache = (PermanentBlobCache) 
blobService;
-                               blobFile = cache.getStorageLocation(jobId, key);
+                               storageDir = cache.getStorageDir();
                        } else if (blobService instanceof TransientBlobCache) {
                                TransientBlobCache cache = (TransientBlobCache) 
blobService;
-                               blobFile = cache.getStorageLocation(jobId, key);
+                               storageDir = cache.getStorageDir();
                        } else {
                                throw new UnsupportedOperationException(
                                        "unsupported BLOB service class: " + 
blobService.getClass().getCanonicalName());
                        }
+
+                       final File blobFile = new 
File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, 
key));
                        if (blobFile.exists()) {
                                ++numFiles;
                        } else if (doThrow) {

Reply via email to