Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r179057582
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
    @@ -631,14 +631,14 @@ else if (current == ExecutionState.CANCELING) {
                                                
DistributedCache.readFileInfoFromConfig(jobConfiguration))
                                {
                                        LOG.info("Obtaining local cache file 
for '{}'.", entry.getKey());
    -                                   Future<Path> cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
    +                                   Future<Path> cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
                                        
distributedCacheEntries.put(entry.getKey(), cp);
                                }
                        }
                        catch (Exception e) {
                                throw new Exception(
    -                                   String.format("Exception while adding 
files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
    --- End diff --
    
    revert


---

Reply via email to