[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411435#comment-16411435
 ] 

ASF GitHub Bot commented on FLINK-8620:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5580#discussion_r176745753
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
         *              Any additional configuration for the blob client
         * @param jobId
         *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
    -    * @param jars
    -    *              List of JAR files to upload
    +    * @param files
    +    *              List of files to upload
         *
         * @throws IOException
         *              if the upload fails
         */
    -   public static List<PermanentBlobKey> uploadJarFiles(
    -                   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List<Path> jars)
    +   public static List<PermanentBlobKey> uploadFiles(
    +                   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List<Path> files)
                        throws IOException {
     
                checkNotNull(jobId);
     
    -           if (jars.isEmpty()) {
    +           if (files.isEmpty()) {
                        return Collections.emptyList();
                } else {
                        List<PermanentBlobKey> blobKeys = new ArrayList<>();
     
                        try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
    -                           for (final Path jar : jars) {
    -                                   final FileSystem fs = 
jar.getFileSystem();
    -                                   FSDataInputStream is = null;
    -                                   try {
    -                                           is = fs.open(jar);
    -                                           final PermanentBlobKey key =
    -                                                   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
    -                                           blobKeys.add(key);
    -                                   } finally {
    -                                           if (is != null) {
    -                                                   is.close();
    -                                           }
    -                                   }
    +                           for (final Path file : files) {
    +                                   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
    +                                   blobKeys.add(key);
                                }
                        }
     
                        return blobKeys;
                }
        }
     
    -   // 
--------------------------------------------------------------------------------------------
    -   //  Miscellaneous
    -   // 
--------------------------------------------------------------------------------------------
    -
    -   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
    -           int len = readLength(in);
    -           byte[] bytes = new byte[len];
    -           readFully(in, bytes, 0, len, "Error message");
    +   /**
    +    * Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
    +    *
    +    * @param jobId
    +    *              ID of the job this blob belongs to (or <tt>null</tt> if 
job-unrelated)
    +    * @param file
    +    *              file to upload
    +    *
    +    * @throws IOException
    +    *              if the upload fails
    +    */
    +   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
    +           final FileSystem fs = file.getFileSystem();
    +           if (fs.getFileStatus(file).isDir()) {
    +                   return uploadDirectory(jobId, file, fs);
    +           } else {
    +                   try (InputStream is = fs.open(file)) {
    +                           return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
    +                   }
    +           }
    +   }
     
    -           try {
    -                   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
    +   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
    +           try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
    +                   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
    +                           compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
    +                           zipStream.finish();
    +                           return (PermanentBlobKey) 
blobOutputStream.finish();
    +                   }
                }
    -           catch (ClassNotFoundException e) {
    -                   // should never occur
    -                   throw new IOException("Could not transfer error 
message", e);
    +   }
    +
    +   private static void compressDirectoryToZipfile(FileSystem fs, 
FileStatus rootDir, FileStatus sourceDir, ZipOutputStream out) throws 
IOException {
    +           for (FileStatus file : fs.listStatus(sourceDir.getPath())) {
    +                   LOG.info("Zipping file: {}", file);
    +                   if (file.isDir()) {
    +                           compressDirectoryToZipfile(fs, rootDir, file, 
out);
    +                   } else {
    +                           String entryName = 
file.getPath().getPath().replace(rootDir.getPath().getPath(), "");
    --- End diff --
    
    Unfortunately I could not it removes only the part to the root of the 
directory. If we have nested directories we need more than just `getName`.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-8620
>                 URL: https://issues.apache.org/jira/browse/FLINK-8620
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to