[
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155567#comment-16155567
]
ASF GitHub Bot commented on FLINK-7068:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r137271569
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws
IOException {
*
* @param jobId
* ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
- * @param requiredBlob
+ * @param blobKey
* blob key associated with the requested file
+ * @param highlyAvailable
+ * whether to the requested file is highly available (HA)
*
* @return file referring to the local storage location of the BLOB
*
* @throws IOException
* Thrown if the file retrieval failed.
*/
- private File getFileInternal(@Nullable JobID jobId, BlobKey
requiredBlob) throws IOException {
- checkArgument(requiredBlob != null, "BLOB key cannot be null.");
+ private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey,
boolean highlyAvailable) throws IOException {
+ checkArgument(blobKey != null, "BLOB key cannot be null.");
- final File localFile = BlobUtils.getStorageLocation(storageDir,
jobId, requiredBlob);
+ final File localFile = BlobUtils.getStorageLocation(storageDir,
jobId, blobKey);
+ readWriteLock.readLock().lock();
- if (localFile.exists()) {
+ try {
+ getFileInternal(jobId, blobKey, highlyAvailable,
localFile);
return localFile;
+ } finally {
+ readWriteLock.readLock().unlock();
}
- else {
+ }
+
+ /**
+ * Helper to retrieve the local path of a file associated with a job
and a blob key.
+ * <p>
+ * The blob server looks the blob key up in its local storage. If the
file exists, it is
+ * returned. If the file does not exist, it is retrieved from the HA
blob store (if available)
+ * or a {@link FileNotFoundException} is thrown.
+ * <p>
+ * <strong>Assumes the read lock has already been acquired.</strong>
+ *
+ * @param jobId
+ * ID of the job this blob belongs to (or <tt>null</tt> if
job-unrelated)
+ * @param blobKey
+ * blob key associated with the requested file
+ * @param highlyAvailable
+ * whether to the requested file is highly available (HA)
+ * @param localFile
+ * (local) file where the blob is/should be stored
+ *
+ * @throws IOException
+ * Thrown if the file retrieval failed.
+ */
+ void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean
highlyAvailable, File localFile) throws IOException {
+ // assume readWriteLock.readLock() was already locked (cannot
really check that)
+
+ if (localFile.exists()) {
+ return;
+ } else if (highlyAvailable) {
+ // Try the HA blob store
+ // first we have to release the read lock in order to
acquire the write lock
+ readWriteLock.readLock().unlock();
+
+ // use a temporary file (thread-safe without locking)
+ File incomingFile = null;
try {
- // Try the blob store
- blobStore.get(jobId, requiredBlob, localFile);
+ incomingFile = createTemporaryFilename();
+ blobStore.get(jobId, blobKey, incomingFile);
+
+ BlobUtils.moveTempFileToStore(
+ incomingFile, jobId, blobKey,
localFile, readWriteLock.writeLock(), LOG, null);
--- End diff --
Not sure whether the `writeLock` should escape the scope of the BlobServer
via `BlobUtils.moveTempFileStore`. I think it would be better to lock outside
of the `moveTempFileToStore` method. This should also give a better separation
of concerns.
> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc.
> which even does not have to be reflected by files.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)