Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179057166
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
    @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
                );
        }
     
    +   public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
    +           checkNotNull(jobId);
    +
    +           synchronized (lock) {
    +                   Set<ExecutionAttemptID> jobRefCounter = 
jobRefHolders.get(jobId);
    +
    +                   if (jobRefCounter == null || jobRefCounter.isEmpty()) {
    +                           LOG.warn("improper use of releaseJob() without 
a matching number of createTmpFiles() calls for jobId " + jobId);
    +                           return;
    +                   }
    +
    +                   jobRefCounter.remove(executionId);
    +                   if (jobRefCounter.isEmpty()) {
    +                           executorService.schedule(new 
DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS);
    +                   }
    +           }
    +   }
    +
        // 
------------------------------------------------------------------------
        //  background processes
        // 
------------------------------------------------------------------------
     
        /**
    -    * Asynchronous file copy process.
    +    * Asynchronous file copy process from blob server.
         */
    -   private static class CopyProcess implements Callable<Path> {
    +   private static class CopyFromBlobProcess implements Callable<Path> {
     
    -           private final Path filePath;
    -           private final Path cachedPath;
    -           private boolean executable;
    +           private final PermanentBlobKey blobKey;
    +           private final Path target;
    +           private final boolean isDirectory;
    +           private final boolean isExecutable;
    +           private final JobID jobID;
    +           private final PermanentBlobService blobService;
     
    -           public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -                   this.filePath = new Path(e.filePath);
    -                   this.executable = e.isExecutable;
    -                   this.cachedPath = cachedPath;
    +           CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
    +                   try {
    +                           this.isExecutable = e.isExecutable;
    +                           this.isDirectory = e.isZipped;
    +                           this.jobID = jobID;
    +                           this.blobService = blobService;
    +                           this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
    +                           this.target = target;
    +                   } catch (Exception ex) {
    +                           throw new RuntimeException(ex);
    +                   }
                }
     
                @Override
                public Path call() throws IOException {
    -                   // let exceptions propagate. we can retrieve them later 
from
    -                   // the future and report them upon access to the result
    -                   copy(filePath, cachedPath, this.executable);
    -                   return cachedPath;
    +                   final File file = blobService.getFile(jobID, blobKey);
    +
    +                   if (isDirectory) {
    +                           try (ZipInputStream zis = new 
ZipInputStream(new FileInputStream(file))) {
    +                                   ZipEntry entry;
    +                                   while ((entry = zis.getNextEntry()) != 
null) {
    +                                           String fileName = 
entry.getName();
    +                                           Path newFile = new Path(target, 
fileName);
    +                                           if (entry.isDirectory()) {
    +                                                   
target.getFileSystem().mkdirs(newFile);
    +                                           } else {
    +                                                   try (FSDataOutputStream 
fsDataOutputStream = target.getFileSystem()
    +                                                                   
.create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) {
    +                                                           
IOUtils.copyBytes(zis, fsDataOutputStream, false);
    +                                                   }
    +                                                   //noinspection 
ResultOfMethodCallIgnored
    +                                                   new 
File(newFile.getPath()).setExecutable(isExecutable);
    +                                           }
    +                                           zis.closeEntry();
    +                                   }
    +                           }
    +                           Files.delete(file.toPath());
    +                           return target;
    +                   } else {
    +                           //noinspection ResultOfMethodCallIgnored
    +                           file.setExecutable(isExecutable);
    +                           return Path.fromLocalFile(file);
    +                   }
    +
                }
        }
     
        /**
         * If no task is using this file after 5 seconds, clear it.
         */
    -   private static class DeleteProcess implements Runnable {
    -
    -           private final Object lock;
    -           private final Map<JobID, Map<String, Tuple4<Integer, File, 
Path, Future<Path>>>> entries;
    +   private class DeleteProcess implements Runnable {
     
    -           private final String name;
                private final JobID jobID;
     
    -           public DeleteProcess(Object lock, Map<JobID, Map<String, 
Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -                                                           String name, 
JobID jobID) {
    -                   this.lock = lock;
    -                   this.entries = entries;
    -                   this.name = name;
    +           DeleteProcess(JobID jobID) {
                        this.jobID = jobID;
                }
     
                @Override
                public void run() {
                        try {
                                synchronized (lock) {
    -                                   Map<String, Tuple4<Integer, File, Path, 
Future<Path>>> jobEntries = entries.get(jobID);
     
    -                                   if (jobEntries != null) {
    -                                           Tuple4<Integer, File, Path, 
Future<Path>> entry = jobEntries.get(name);
    +                                   if (jobRefHolders.get(jobID).isEmpty()) 
{
    +                                           // abort the copy
    +                                           for (Future<Path> fileFuture : 
entries.get(jobID).values()) {
    +                                                   fileFuture.cancel(true);
    +                                           }
     
    -                                           if (entry != null) {
    -                                                   int count = entry.f0;
    -                                                   if (count > 1) {
    -                                                           // multiple 
references still
    -                                                           entry.f0 = 
count - 1;
    -                                                   }
    -                                                   else {
    -                                                           // we remove 
the last reference
    -                                                           
jobEntries.remove(name);
    -                                                           if 
(jobEntries.isEmpty()) {
    -                                                                   
entries.remove(jobID);
    -                                                           }
    -
    -                                                           // abort the 
copy
    -                                                           
entry.f3.cancel(true);
    -
    -                                                           // remove the 
file
    -                                                           File file = new 
File(entry.f2.toString());
    -                                                           if 
(file.exists()) {
    -                                                                   if 
(file.isDirectory()) {
    -                                                                           
FileUtils.deleteDirectory(file);
    -                                                                   }
    -                                                                   else if 
(!file.delete()) {
    -                                                                           
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -                                                                   }
    -                                                           }
    -
    -                                                           // remove the 
job wide temp directory, if it is now empty
    -                                                           File parent = 
entry.f1;
    -                                                           if 
(parent.isDirectory()) {
    -                                                                   
String[] children = parent.list();
    -                                                                   if 
(children == null || children.length == 0) {
    -                                                                           
//noinspection ResultOfMethodCallIgnored
    -                                                                           
parent.delete();
    -                                                                   }
    -                                                           }
    -                                                   }
    +                                           // remove the job wide temp 
directories
    +                                           for (File storageDirectory : 
storageDirectories) {
    +                                                   File tempDir = new 
File(storageDirectory, jobID.toString());
    +                                                   
FileUtils.deleteDirectory(tempDir);
                                                }
                                        }
                                }
    -                   }
    -                   catch (IOException e) {
    +                   } catch (IOException e) {
                                LOG.error("Could not delete file from local 
file cache.", e);
                        }
                }
        }
    +
    --- End diff --
    
    revert


---

Reply via email to