This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c22d01d [FLINK-17463][tests] Avoid concurrent directory creation and
deletion
c22d01d is described below
commit c22d01d3bfbb1384f98664361f1491b806e95798
Author: Gary Yao <[email protected]>
AuthorDate: Wed May 27 15:23:47 2020 +0200
[FLINK-17463][tests] Avoid concurrent directory creation and deletion
BlobCacheCleanupTest#testPermanentBlobCleanup() tests that job related
files are cleaned up by a background task when the job is released from
the PermanentBlobCache. The tests asserts that the uploaded blobs are
deleted from the filesystem. Because the scheduling of the background
task cannot be controlled from outside the cache, the test polls the
filesystem. More precisely, the test uses BlobUtils#getStorageLocation()
to build the path on the filesystem given a blobkey and tests the
existence of that path in regular intervals. As a side effect, however,
BlobUtils#getStorageLocation() also creates all necessary directories to
that path if they do not exist yet. This leads to a situation where
directories and concurrently deleted and created, which can cause
FileAlreadyExists exceptions. This commit fixes the issue.
Note that the above applies to all tests that invoke
BlobServerCleanupTest#checkFilesExist().
This closes #12376.
---
.../java/org/apache/flink/runtime/blob/AbstractBlobCache.java | 4 ++++
.../main/java/org/apache/flink/runtime/blob/BlobServer.java | 4 ++++
.../org/apache/flink/runtime/blob/BlobServerCleanupTest.java | 10 ++++++----
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index ce12898..8a873f1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -122,6 +122,10 @@ public abstract class AbstractBlobCache implements
Closeable {
this.serverAddress = serverAddress;
}
+ public File getStorageDir() {
+ return storageDir;
+ }
+
/**
* Returns local copy of the file for the BLOB with the given key.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index a47040c..a50f535 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -213,6 +213,10 @@ public class BlobServer extends Thread implements
BlobService, BlobWriter, Perma
// Path Accessors
//
--------------------------------------------------------------------------------------------
+ public File getStorageDir() {
+ return storageDir;
+ }
+
/**
* Returns a file handle to the file associated with the given blob key
on the blob
* server.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
index aafba30..04c1187 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerCleanupTest.java
@@ -204,20 +204,22 @@ public class BlobServerCleanupTest extends TestLogger {
int numFiles = 0;
for (BlobKey key : keys) {
- final File blobFile;
+ final File storageDir;
if (blobService instanceof BlobServer) {
BlobServer server = (BlobServer) blobService;
- blobFile = server.getStorageLocation(jobId,
key);
+ storageDir = server.getStorageDir();
} else if (blobService instanceof PermanentBlobCache) {
PermanentBlobCache cache = (PermanentBlobCache)
blobService;
- blobFile = cache.getStorageLocation(jobId, key);
+ storageDir = cache.getStorageDir();
} else if (blobService instanceof TransientBlobCache) {
TransientBlobCache cache = (TransientBlobCache)
blobService;
- blobFile = cache.getStorageLocation(jobId, key);
+ storageDir = cache.getStorageDir();
} else {
throw new UnsupportedOperationException(
"unsupported BLOB service class: " +
blobService.getClass().getCanonicalName());
}
+
+ final File blobFile = new
File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId,
key));
if (blobFile.exists()) {
++numFiles;
} else if (doThrow) {