[ 
https://issues.apache.org/jira/browse/FLINK-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14295026#comment-14295026
 ] 

ASF GitHub Bot commented on FLINK-1419:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/339#discussion_r23680555
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
    @@ -179,27 +179,40 @@ public Path call()  {
                
                private String name;
                private JobID jobID;
    -           private int oldCount;
    +           private String filePath;
     
                public DeleteProcess(String name, DistributedCacheEntry e, 
JobID jobID, int c) {
                        this.name = name;
                        this.jobID = jobID;
    -                   this.oldCount = c;
    +                   this.filePath = e.filePath;
                }
    +
                @Override
                public void run() {
    -                   synchronized (count) {
    -                           if (count.get(new ImmutablePair<JobID, 
String>(jobID, name)) != oldCount) {
    +                   Pair<JobID, String> key = new ImmutablePair<JobID, 
String>(jobID, name);
    +                   synchronized (lock) {
    +                           if (!count.containsKey(key)) {
                                        return;
                                }
    -                   }
    -                   Path tmp = getTempDir(jobID, "");
    -                   try {
    -                           if (lfs.exists(tmp)) {
    -                                   lfs.delete(tmp, true);
    +                           count.put(key, count.get(key) - 1);
    +                           if (count.get(key) != 0) {
    +                                   return;
    +                           }
    +                           Path tmp = getTempDir(jobID, 
filePath.substring(filePath.lastIndexOf("/") + 1));
    +                           try {
    +                                   if (lfs.exists(tmp)) {
    +                                           lfs.delete(tmp, true);
    +                                   }
    +                                   count.remove(key);
    +                                   if (count.isEmpty()) { //delete job 
directory
    --- End diff --
    
    This won't probably work, because there might still be other jobs running 
on the TaskManager which have not yet deleted all of their files. The problem 
is the ```Pair<JobID, Filename>```. Either we change count to be something like 
```HashMap[JobID, HashMap[String, Integer]]``` or have to iterate over all 
entries to see if there still exists an entry with the respective jobID.


> DistributedCache doesn't preserver files for subsequent operations
> ------------------------------------------------------------------
>
>                 Key: FLINK-1419
>                 URL: https://issues.apache.org/jira/browse/FLINK-1419
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 0.8, 0.9
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>
> When subsequent operations want to access the same files in the DC it 
> frequently happens that the files are not created for the following operation.
> This is fairly odd, since the DC is supposed to either a) preserve files when 
> another operation kicks in within a certain time window, or b) just recreate 
> the deleted files. Both things don't happen.
> Increasing the time window had no effect.
> I'd like to use this issue as a starting point for a more general discussion 
> about the DistributedCache. 
> Currently:
> 1. all files reside in a common job-specific directory
> 2. are deleted during the job.
>  
> One thing that was brought up about Trait 1 is that it basically forbids 
> modification of the files, concurrent access and all. Personally I'm not sure 
> if this a problem. Changing it to a task-specific place solved the issue 
> though.
> I'm more concerned about Trait #2. Besides the mentioned issue, the deletion 
> is realized with the scheduler, which adds a lot of complexity to the current 
> code. (It really is a pain to work on...) 
> If we moved the deletion to the end of the job it could be done as a clean-up 
> step in the TaskManager, With this we could reduce the DC to a 
> cacheFile(String source) method, the delete method in the TM, and throw out 
> everything else.
> Also, the current implementation implies that big files may be copied 
> multiple times. This may be undesired, depending on how big the files are.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to