[
https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14295026#comment-14295026
]
ASF GitHub Bot commented on FLINK-1419:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/339#discussion_r23680555
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
---
@@ -179,27 +179,40 @@ public Path call() {
private String name;
private JobID jobID;
- private int oldCount;
+ private String filePath;
public DeleteProcess(String name, DistributedCacheEntry e,
JobID jobID, int c) {
this.name = name;
this.jobID = jobID;
- this.oldCount = c;
+ this.filePath = e.filePath;
}
+
@Override
public void run() {
- synchronized (count) {
- if (count.get(new ImmutablePair<JobID,
String>(jobID, name)) != oldCount) {
+ Pair<JobID, String> key = new ImmutablePair<JobID,
String>(jobID, name);
+ synchronized (lock) {
+ if (!count.containsKey(key)) {
return;
}
- }
- Path tmp = getTempDir(jobID, "");
- try {
- if (lfs.exists(tmp)) {
- lfs.delete(tmp, true);
+ count.put(key, count.get(key) - 1);
+ if (count.get(key) != 0) {
+ return;
+ }
+ Path tmp = getTempDir(jobID,
filePath.substring(filePath.lastIndexOf("/") + 1));
+ try {
+ if (lfs.exists(tmp)) {
+ lfs.delete(tmp, true);
+ }
+ count.remove(key);
+ if (count.isEmpty()) { //delete job
directory
--- End diff --
This won't probably work, because there might still be other jobs running
on the TaskManager which have not yet deleted all of their files. The problem
is the ```Pair<JobID, Filename>```. Either we change count to be something like
```HashMap[JobID, HashMap[String, Integer]]``` or have to iterate over all
entries to see if there still exists an entry with the respective jobID.
> DistributedCache doesn't preserver files for subsequent operations
> ------------------------------------------------------------------
>
> Key: FLINK-1419
> URL: https://issues.apache.org/jira/browse/FLINK-1419
> Project: Flink
> Issue Type: Bug
> Affects Versions: 0.8, 0.9
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
>
> When subsequent operations want to access the same files in the DC it
> frequently happens that the files are not created for the following operation.
> This is fairly odd, since the DC is supposed to either a) preserve files when
> another operation kicks in within a certain time window, or b) just recreate
> the deleted files. Both things don't happen.
> Increasing the time window had no effect.
> I'd like to use this issue as a starting point for a more general discussion
> about the DistributedCache.
> Currently:
> 1. all files reside in a common job-specific directory
> 2. are deleted during the job.
>
> One thing that was brought up about Trait 1 is that it basically forbids
> modification of the files, concurrent access and all. Personally I'm not sure
> if this a problem. Changing it to a task-specific place solved the issue
> though.
> I'm more concerned about Trait #2. Besides the mentioned issue, the deletion
> is realized with the scheduler, which adds a lot of complexity to the current
> code. (It really is a pain to work on...)
> If we moved the deletion to the end of the job it could be done as a clean-up
> step in the TaskManager, With this we could reduce the DC to a
> cacheFile(String source) method, the delete method in the TM, and throw out
> everything else.
> Also, the current implementation implies that big files may be copied
> multiple times. This may be undesired, depending on how big the files are.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)