Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5580#discussion_r179057582
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -631,14 +631,14 @@ else if (current == ExecutionState.CANCELING) {
DistributedCache.readFileInfoFromConfig(jobConfiguration))
{
LOG.info("Obtaining local cache file
for '{}'.", entry.getKey());
- Future<Path> cp =
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId);
+ Future<Path> cp =
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
distributedCacheEntries.put(entry.getKey(), cp);
}
}
catch (Exception e) {
throw new Exception(
- String.format("Exception while adding
files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId),
--- End diff --
revert
---