Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5580#discussion_r179057166 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java --- @@ -262,106 +216,120 @@ private static Thread createShutdownHook(final FileCache cache, final Logger log ); } + public void releaseJob(JobID jobId, ExecutionAttemptID executionId) { + checkNotNull(jobId); + + synchronized (lock) { + Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId); + + if (jobRefCounter == null || jobRefCounter.isEmpty()) { + LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId); + return; + } + + jobRefCounter.remove(executionId); + if (jobRefCounter.isEmpty()) { + executorService.schedule(new DeleteProcess(jobId), cleanupInterval, TimeUnit.SECONDS); + } + } + } + // ------------------------------------------------------------------------ // background processes // ------------------------------------------------------------------------ /** - * Asynchronous file copy process. + * Asynchronous file copy process from blob server. */ - private static class CopyProcess implements Callable<Path> { + private static class CopyFromBlobProcess implements Callable<Path> { - private final Path filePath; - private final Path cachedPath; - private boolean executable; + private final PermanentBlobKey blobKey; + private final Path target; + private final boolean isDirectory; + private final boolean isExecutable; + private final JobID jobID; + private final PermanentBlobService blobService; - public CopyProcess(DistributedCacheEntry e, Path cachedPath) { - this.filePath = new Path(e.filePath); - this.executable = e.isExecutable; - this.cachedPath = cachedPath; + CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) { + try { + this.isExecutable = e.isExecutable; + this.isDirectory = e.isZipped; + this.jobID = jobID; + this.blobService = blobService; + this.blobKey = InstantiationUtil.deserializeObject(e.blobKey, Thread.currentThread().getContextClassLoader()); + this.target = target; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override public Path call() throws IOException { - // let exceptions propagate. we can retrieve them later from - // the future and report them upon access to the result - copy(filePath, cachedPath, this.executable); - return cachedPath; + final File file = blobService.getFile(jobID, blobKey); + + if (isDirectory) { + try (ZipInputStream zis = new ZipInputStream(new FileInputStream(file))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + String fileName = entry.getName(); + Path newFile = new Path(target, fileName); + if (entry.isDirectory()) { + target.getFileSystem().mkdirs(newFile); + } else { + try (FSDataOutputStream fsDataOutputStream = target.getFileSystem() + .create(newFile, FileSystem.WriteMode.NO_OVERWRITE)) { + IOUtils.copyBytes(zis, fsDataOutputStream, false); + } + //noinspection ResultOfMethodCallIgnored + new File(newFile.getPath()).setExecutable(isExecutable); + } + zis.closeEntry(); + } + } + Files.delete(file.toPath()); + return target; + } else { + //noinspection ResultOfMethodCallIgnored + file.setExecutable(isExecutable); + return Path.fromLocalFile(file); + } + } } /** * If no task is using this file after 5 seconds, clear it. */ - private static class DeleteProcess implements Runnable { - - private final Object lock; - private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries; + private class DeleteProcess implements Runnable { - private final String name; private final JobID jobID; - public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries, - String name, JobID jobID) { - this.lock = lock; - this.entries = entries; - this.name = name; + DeleteProcess(JobID jobID) { this.jobID = jobID; } @Override public void run() { try { synchronized (lock) { - Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID); - if (jobEntries != null) { - Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name); + if (jobRefHolders.get(jobID).isEmpty()) { + // abort the copy + for (Future<Path> fileFuture : entries.get(jobID).values()) { + fileFuture.cancel(true); + } - if (entry != null) { - int count = entry.f0; - if (count > 1) { - // multiple references still - entry.f0 = count - 1; - } - else { - // we remove the last reference - jobEntries.remove(name); - if (jobEntries.isEmpty()) { - entries.remove(jobID); - } - - // abort the copy - entry.f3.cancel(true); - - // remove the file - File file = new File(entry.f2.toString()); - if (file.exists()) { - if (file.isDirectory()) { - FileUtils.deleteDirectory(file); - } - else if (!file.delete()) { - LOG.error("Could not delete locally cached file " + file.getAbsolutePath()); - } - } - - // remove the job wide temp directory, if it is now empty - File parent = entry.f1; - if (parent.isDirectory()) { - String[] children = parent.list(); - if (children == null || children.length == 0) { - //noinspection ResultOfMethodCallIgnored - parent.delete(); - } - } - } + // remove the job wide temp directories + for (File storageDirectory : storageDirectories) { + File tempDir = new File(storageDirectory, jobID.toString()); + FileUtils.deleteDirectory(tempDir); } } } - } - catch (IOException e) { + } catch (IOException e) { LOG.error("Could not delete file from local file cache.", e); } } } + --- End diff -- revert
---