[
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072333#comment-16072333
]
ASF GitHub Bot commented on FLINK-7057:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125249421
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -162,105 +164,116 @@ static File initStorageDirectory(String
storageDirectory) throws
}
/**
- * Returns the BLOB service's directory for incoming files. The
directory is created if it did
- * not exist so far.
+ * Returns the BLOB service's directory for incoming (job-unrelated)
files. The directory is
+ * created if it does not exist yet.
+ *
+ * @param storageDir
+ * storage directory used be the BLOB service
*
- * @return the BLOB server's directory for incoming files
+ * @return the BLOB service's directory for incoming files
*/
static File getIncomingDirectory(File storageDir) {
final File incomingDir = new File(storageDir, "incoming");
- if (!incomingDir.mkdirs() && !incomingDir.exists()) {
- throw new RuntimeException("Cannot create directory for
incoming files " + incomingDir.getAbsolutePath());
- }
+ mkdirTolerateExisting(incomingDir, "incoming");
return incomingDir;
}
/**
- * Returns the BLOB service's directory for cached files. The directory
is created if it did
- * not exist so far.
+ * Makes sure a given directory exists by creating it if necessary.
*
- * @return the BLOB server's directory for cached files
+ * @param dir
+ * directory to create
+ * @param dirType
+ * the type of the directory (included in error message if
something fails)
*/
- private static File getCacheDirectory(File storageDir) {
- final File cacheDirectory = new File(storageDir, "cache");
-
- if (!cacheDirectory.mkdirs() && !cacheDirectory.exists()) {
- throw new RuntimeException("Could not create cache
directory '" + cacheDirectory.getAbsolutePath() + "'.");
+ private static void mkdirTolerateExisting(final File dir, final String
dirType) {
+ // note: thread-safe create should try to mkdir first and then
ignore the case that the
+ // directory already existed
+ if (!dir.mkdirs() && !dir.exists()) {
+ throw new RuntimeException(
+ "Cannot create " + dirType + " directory '" +
dir.getAbsolutePath() + "'.");
}
-
- return cacheDirectory;
}
/**
* Returns the (designated) physical storage location of the BLOB with
the given key.
*
+ * @param storageDir
+ * storage directory used be the BLOB service
* @param key
- * the key identifying the BLOB
+ * the key identifying the BLOB
+ * @param jobId
+ * ID of the job for the incoming files (or <tt>null</tt>
if job-unrelated)
+ *
* @return the (designated) physical storage location of the BLOB
*/
- static File getStorageLocation(File storageDir, BlobKey key) {
- return new File(getCacheDirectory(storageDir), BLOB_FILE_PREFIX
+ key.toString());
- }
+ static File getStorageLocation(
+ @Nonnull File storageDir, @Nullable JobID jobId,
@Nonnull BlobKey key) {
+ File file = new
File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));
- /**
- * Returns the (designated) physical storage location of the BLOB with
the given job ID and key.
- *
- * @param jobID
- * the ID of the job the BLOB belongs to
- * @param key
- * the key of the BLOB
- * @return the (designated) physical storage location of the BLOB with
the given job ID and key
- */
- static File getStorageLocation(File storageDir, JobID jobID, String
key) {
- return new File(getJobDirectory(storageDir, jobID),
BLOB_FILE_PREFIX + encodeKey(key));
+ mkdirTolerateExisting(file.getParentFile(), "cache");
--- End diff --
Why are we creating a `cache` directory here?
> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}}
> level but rather per job. Therefore, the cleanup process should be adapted,
> too.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)