[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520543#comment-16520543
 ] 

ASF GitHub Bot commented on FLINK-9624:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6199#discussion_r197496529
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
    @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
                return this.userJarBlobKeys;
        }
     
    -   /**
    -    * Uploads the previously added user JAR files to the job manager 
through
    -    * the job manager's BLOB server. The BLOB servers' address is given as 
a
    -    * parameter. This function issues a blocking call.
    -    *
    -    * @param blobServerAddress of the blob server to upload the jars to
    -    * @param blobClientConfig the blob client configuration
    -    * @throws IOException Thrown, if the file upload to the JobManager 
failed.
    -    */
    -   public void uploadUserJars(
    -                   InetSocketAddress blobServerAddress,
    -                   Configuration blobClientConfig) throws IOException {
    -           if (!userJars.isEmpty()) {
    -                   List<PermanentBlobKey> blobKeys = 
BlobClient.uploadFiles(
    -                           blobServerAddress, blobClientConfig, jobID, 
userJars);
    -
    -                   for (PermanentBlobKey blobKey : blobKeys) {
    -                           if (!userJarBlobKeys.contains(blobKey)) {
    -                                   userJarBlobKeys.add(blobKey);
    -                           }
    -                   }
    -           }
    -   }
    -
        @Override
        public String toString() {
                return "JobGraph(jobId: " + jobID + ")";
        }
     
    -   /**
    -    * Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
    -    * to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
    -    *
    -    * @param blobServerAddress of the blob server to upload the files to
    -    * @param blobClientConfig the blob client configuration
    -    * @throws IOException Thrown, if the file upload to the Blob server 
failed.
    -    */
    -   public void uploadUserArtifacts(
    -                   InetSocketAddress blobServerAddress,
    -                   Configuration blobClientConfig) throws IOException {
    -
    -           Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> 
uploadToBlobServer = new HashSet<>();
    -           Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> 
distributeViaDFS = new HashSet<>();
    -
    -           for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
userArtifact : userArtifacts.entrySet()) {
    -                   Path filePath = new 
Path(userArtifact.getValue().filePath);
    -
    -                   try {
    -                           if (filePath.getFileSystem().isDistributedFS()) 
{
    -                                   distributeViaDFS.add(userArtifact);
    -                           } else {
    -                                   uploadToBlobServer.add(userArtifact);
    -                           }
    -
    -                   } catch (IOException ex) {
    -                           distributeViaDFS.add(userArtifact);
    -                   }
    +   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
    +           byte[] serializedBlobKey;
    +           try {
    +                   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
    +           } catch (IOException e) {
    +                   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
                }
    +           userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
    +                   originalEntry.filePath,
    +                   originalEntry.isExecutable,
    +                   serializedBlobKey,
    +                   originalEntry.isZipped
    +           ));
    +   }
     
    -           uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
    -
    -           for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
userArtifact : distributeViaDFS) {
    +   public void finalizeUserArtifactEntries() {
    --- End diff --
    
    https://issues.apache.org/jira/browse/FLINK-8713


> Move jar/artifact upload logic out of JobGraph
> ----------------------------------------------
>
>                 Key: FLINK-9624
>                 URL: https://issues.apache.org/jira/browse/FLINK-9624
>             Project: Flink
>          Issue Type: Improvement
>          Components: Job-Submission
>    Affects Versions: 1.6.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of theĀ {{JobGraph}} but 
> the submission-method, like theĀ {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to