[ https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520330#comment-16520330 ]
ASF GitHub Bot commented on FLINK-9624: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6199#discussion_r197434489 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java --- @@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() { return this.userJarBlobKeys; } - /** - * Uploads the previously added user JAR files to the job manager through - * the job manager's BLOB server. The BLOB servers' address is given as a - * parameter. This function issues a blocking call. - * - * @param blobServerAddress of the blob server to upload the jars to - * @param blobClientConfig the blob client configuration - * @throws IOException Thrown, if the file upload to the JobManager failed. - */ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** - * Configures JobGraph with user specified artifacts. If the files are in local system it uploads them - * to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. - * - * @param blobServerAddress of the blob server to upload the files to - * @param blobClientConfig the blob client configuration - * @throws IOException Thrown, if the file upload to the Blob server failed. - */ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - - Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer = new HashSet<>(); - Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> distributeViaDFS = new HashSet<>(); - - for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) { + byte[] serializedBlobKey; + try { + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not serialize blobkey " + blobKey + ".", e); --- End diff -- I would not throw a `FlinkRuntimeException` here. Instead we could led the `IOException` bubble up. > Move jar/artifact upload logic out of JobGraph > ---------------------------------------------- > > Key: FLINK-9624 > URL: https://issues.apache.org/jira/browse/FLINK-9624 > Project: Flink > Issue Type: Improvement > Components: Job-Submission > Affects Versions: 1.6.0 > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The {{JobGraph}} offers utility methods for uploading jars and artifacts to > the BlobService. > However, how these files are uploaded isn't a concern of theĀ {{JobGraph}} but > the submission-method, like theĀ {{RestClusterClient}}. > These methods should be moved into a utility class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)