[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411448#comment-16411448
 ] 

ASF GitHub Bot commented on FLINK-8620:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176747071
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
    @@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
        // 
------------------------------------------------------------------------
     
        /**
    -    * Asynchronous file copy process.
    -    */
    -   private static class CopyProcess implements Callable<Path> {
    -
    -           private final Path filePath;
    -           private final Path cachedPath;
    -           private boolean executable;
    -
    -           public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
    -                   this.filePath = new Path(e.filePath);
    -                   this.executable = e.isExecutable;
    -                   this.cachedPath = cachedPath;
    -           }
    -
    -           @Override
    -           public Path call() throws IOException {
    -                   // let exceptions propagate. we can retrieve them later 
from
    -                   // the future and report them upon access to the result
    -                   copy(filePath, cachedPath, this.executable);
    -                   return cachedPath;
    -           }
    -   }
    -
    -   /**
    -    * If no task is using this file after 5 seconds, clear it.
    +    * Asynchronous file copy process from blob server.
         */
    -   private static class DeleteProcess implements Runnable {
    +   private static class CopyFromBlobProcess implements Callable<Path> {
     
    -           private final Object lock;
    -           private final Map<JobID, Map<String, Tuple4<Integer, File, 
Path, Future<Path>>>> entries;
    -
    -           private final String name;
    +           private final PermanentBlobKey blobKey;
    +           private final Path target;
    +           private final boolean directory;
    +           private final boolean executable;
                private final JobID jobID;
    +           private final PermanentBlobService blobService;
     
    -           public DeleteProcess(Object lock, Map<JobID, Map<String, 
Tuple4<Integer, File, Path, Future<Path>>>> entries,
    -                                                           String name, 
JobID jobID) {
    -                   this.lock = lock;
    -                   this.entries = entries;
    -                   this.name = name;
    -                   this.jobID = jobID;
    +           CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
    +                   try {
    +                           this.executable = e.isExecutable;
    +                           this.directory = e.isZipped;
    +                           this.jobID = jobID;
    +                           this.blobService = blobService;
    +                           this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
    +                           this.target = target;
    +                   } catch (Exception ex) {
    +                           throw new RuntimeException(ex);
    +                   }
                }
     
                @Override
    -           public void run() {
    -                   try {
    -                           synchronized (lock) {
    -                                   Map<String, Tuple4<Integer, File, Path, 
Future<Path>>> jobEntries = entries.get(jobID);
    -
    -                                   if (jobEntries != null) {
    -                                           Tuple4<Integer, File, Path, 
Future<Path>> entry = jobEntries.get(name);
    -
    -                                           if (entry != null) {
    -                                                   int count = entry.f0;
    -                                                   if (count > 1) {
    -                                                           // multiple 
references still
    -                                                           entry.f0 = 
count - 1;
    -                                                   }
    -                                                   else {
    -                                                           // we remove 
the last reference
    -                                                           
jobEntries.remove(name);
    -                                                           if 
(jobEntries.isEmpty()) {
    -                                                                   
entries.remove(jobID);
    -                                                           }
    -
    -                                                           // abort the 
copy
    -                                                           
entry.f3.cancel(true);
    -
    -                                                           // remove the 
file
    -                                                           File file = new 
File(entry.f2.toString());
    -                                                           if 
(file.exists()) {
    -                                                                   if 
(file.isDirectory()) {
    -                                                                           
FileUtils.deleteDirectory(file);
    -                                                                   }
    -                                                                   else if 
(!file.delete()) {
    -                                                                           
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
    -                                                                   }
    -                                                           }
    -
    -                                                           // remove the 
job wide temp directory, if it is now empty
    -                                                           File parent = 
entry.f1;
    -                                                           if 
(parent.isDirectory()) {
    -                                                                   
String[] children = parent.list();
    -                                                                   if 
(children == null || children.length == 0) {
    -                                                                           
//noinspection ResultOfMethodCallIgnored
    -                                                                           
parent.delete();
    -                                                                   }
    -                                                           }
    +           public Path call() throws IOException {
    +                   final File file = blobService.getFile(jobID, blobKey);
    +
    +                   if (directory) {
    +                           try (ZipInputStream zis = new 
ZipInputStream(new FileInputStream(file))) {
    +                                   ZipEntry entry;
    +                                   while ((entry = zis.getNextEntry()) != 
null) {
    --- End diff --
    
    It does not need to. getNextEntry does it, at the very beginning.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-8620
>                 URL: https://issues.apache.org/jira/browse/FLINK-8620
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to