Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176769129
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
    @@ -143,30 +160,23 @@ public void shutdown() {
        /**
         * If the file doesn't exists locally, it will copy the file to the 
temp directory.
         *
    -    * @param name  The name under which the file is registered.
         * @param entry The cache entry descriptor (path, executable flag)
         * @param jobID The ID of the job for which the file is copied.
         * @return The handle to the task that copies the file.
         */
    -   public Future<Path> createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID) {
    +   public Future<Path> createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID, ExecutionAttemptID executionId) {
                synchronized (lock) {
    -                   Map<String, Tuple4<Integer, File, Path, Future<Path>>> 
jobEntries = entries.get(jobID);
    -                   if (jobEntries == null) {
    -                           jobEntries = new HashMap<String, 
Tuple4<Integer, File, Path, Future<Path>>>();
    -                           entries.put(jobID, jobEntries);
    -                   }
    +                   Map<String, Future<Path>> jobEntries = 
entries.computeIfAbsent(jobID, k -> new HashMap<>());
    +                   final Set<ExecutionAttemptID> refHolders = 
jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
    +                   refHolders.add(executionId);
     
                        // tuple is (ref-count, parent-temp-dir, 
cached-file-path, copy-process)
    --- End diff --
    
    outdated


---

Reply via email to