Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4358#discussion_r141406045
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
    @@ -389,95 +413,332 @@ public File getFile(JobID jobId, BlobKey key) throws 
IOException {
         *
         * @param jobId
         *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
    -    * @param requiredBlob
    +    * @param blobKey
         *              blob key associated with the requested file
    +    * @param highlyAvailable
    +    *              whether to the requested file is highly available (HA)
         *
         * @return file referring to the local storage location of the BLOB
         *
         * @throws IOException
         *              Thrown if the file retrieval failed.
         */
    -   private File getFileInternal(@Nullable JobID jobId, BlobKey 
requiredBlob) throws IOException {
    -           checkArgument(requiredBlob != null, "BLOB key cannot be null.");
    +   private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey, 
boolean highlyAvailable) throws IOException {
    +           checkArgument(blobKey != null, "BLOB key cannot be null.");
     
    -           final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, requiredBlob);
    +           final File localFile = BlobUtils.getStorageLocation(storageDir, 
jobId, blobKey);
    +           readWriteLock.readLock().lock();
     
    -           if (localFile.exists()) {
    +           try {
    +                   getFileInternal(jobId, blobKey, highlyAvailable, 
localFile);
                        return localFile;
    +           } finally {
    +                   readWriteLock.readLock().unlock();
                }
    -           else {
    +   }
    +
    +   /**
    +    * Helper to retrieve the local path of a file associated with a job 
and a blob key.
    +    * <p>
    +    * The blob server looks the blob key up in its local storage. If the 
file exists, it is
    +    * returned. If the file does not exist, it is retrieved from the HA 
blob store (if available)
    +    * or a {@link FileNotFoundException} is thrown.
    +    * <p>
    +    * <strong>Assumes the read lock has already been acquired.</strong>
    +    *
    +    * @param jobId
    +    *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
    +    * @param blobKey
    +    *              blob key associated with the requested file
    +    * @param highlyAvailable
    +    *              whether to the requested file is highly available (HA)
    +    * @param localFile
    +    *      (local) file where the blob is/should be stored
    +    *
    +    * @throws IOException
    +    *              Thrown if the file retrieval failed.
    +    */
    +   void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, boolean 
highlyAvailable, File localFile) throws IOException {
    +           // assume readWriteLock.readLock() was already locked (cannot 
really check that)
    +
    +           if (localFile.exists()) {
    +                   return;
    +           } else if (highlyAvailable) {
    +                   // Try the HA blob store
    +                   // first we have to release the read lock in order to 
acquire the write lock
    +                   readWriteLock.readLock().unlock();
    +
    +                   // use a temporary file (thread-safe without locking)
    +                   File incomingFile = null;
                        try {
    -                           // Try the blob store
    -                           blobStore.get(jobId, requiredBlob, localFile);
    +                           incomingFile = createTemporaryFilename();
    +                           blobStore.get(jobId, blobKey, incomingFile);
    +
    +                           BlobUtils.moveTempFileToStore(
    +                                   incomingFile, jobId, blobKey, 
localFile, readWriteLock.writeLock(), LOG, null);
    --- End diff --
    
    Alright.


---

Reply via email to