Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r141406045 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws IOException { * * @param jobId * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) - * @param requiredBlob + * @param blobKey * blob key associated with the requested file + * @param highlyAvailable + * whether to the requested file is highly available (HA) * * @return file referring to the local storage location of the BLOB * * @throws IOException * Thrown if the file retrieval failed. */ - private File getFileInternal(@Nullable JobID jobId, BlobKey requiredBlob) throws IOException { - checkArgument(requiredBlob != null, "BLOB key cannot be null."); + private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable) throws IOException { + checkArgument(blobKey != null, "BLOB key cannot be null."); - final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, requiredBlob); + final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); + readWriteLock.readLock().lock(); - if (localFile.exists()) { + try { + getFileInternal(jobId, blobKey, highlyAvailable, localFile); return localFile; + } finally { + readWriteLock.readLock().unlock(); } - else { + } + + /** + * Helper to retrieve the local path of a file associated with a job and a blob key. + * <p> + * The blob server looks the blob key up in its local storage. If the file exists, it is + * returned. If the file does not exist, it is retrieved from the HA blob store (if available) + * or a {@link FileNotFoundException} is thrown. + * <p> + * <strong>Assumes the read lock has already been acquired.</strong> + * + * @param jobId + * ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) + * @param blobKey + * blob key associated with the requested file + * @param highlyAvailable + * whether to the requested file is highly available (HA) + * @param localFile + * (local) file where the blob is/should be stored + * + * @throws IOException + * Thrown if the file retrieval failed. + */ + void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean highlyAvailable, File localFile) throws IOException { + // assume readWriteLock.readLock() was already locked (cannot really check that) + + if (localFile.exists()) { + return; + } else if (highlyAvailable) { + // Try the HA blob store + // first we have to release the read lock in order to acquire the write lock + readWriteLock.readLock().unlock(); + + // use a temporary file (thread-safe without locking) + File incomingFile = null; try { - // Try the blob store - blobStore.get(jobId, requiredBlob, localFile); + incomingFile = createTemporaryFilename(); + blobStore.get(jobId, blobKey, incomingFile); + + BlobUtils.moveTempFileToStore( + incomingFile, jobId, blobKey, localFile, readWriteLock.writeLock(), LOG, null); --- End diff -- Alright.
---