Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r176662188
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final
FileCache cache, final Logger log
//
------------------------------------------------------------------------
/**
- * Asynchronous file copy process.
- */
- private static class CopyProcess implements Callable<Path> {
-
- private final Path filePath;
- private final Path cachedPath;
- private boolean executable;
-
- public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
- this.filePath = new Path(e.filePath);
- this.executable = e.isExecutable;
- this.cachedPath = cachedPath;
- }
-
- @Override
- public Path call() throws IOException {
- // let exceptions propagate. we can retrieve them later
from
- // the future and report them upon access to the result
- copy(filePath, cachedPath, this.executable);
- return cachedPath;
- }
- }
-
- /**
- * If no task is using this file after 5 seconds, clear it.
+ * Asynchronous file copy process from blob server.
*/
- private static class DeleteProcess implements Runnable {
+ private static class CopyFromBlobProcess implements Callable<Path> {
- private final Object lock;
- private final Map<JobID, Map<String, Tuple4<Integer, File,
Path, Future<Path>>>> entries;
-
- private final String name;
+ private final PermanentBlobKey blobKey;
+ private final Path target;
+ private final boolean directory;
+ private final boolean executable;
private final JobID jobID;
+ private final PermanentBlobService blobService;
- public DeleteProcess(Object lock, Map<JobID, Map<String,
Tuple4<Integer, File, Path, Future<Path>>>> entries,
--- End diff --
who's responsible for cleaning up the created local files?
---