[
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520328#comment-16520328
]
ASF GitHub Bot commented on FLINK-9624:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6199#discussion_r197437750
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
- /**
- * Uploads the previously added user JAR files to the job manager
through
- * the job manager's BLOB server. The BLOB servers' address is given as
a
- * parameter. This function issues a blocking call.
- *
- * @param blobServerAddress of the blob server to upload the jars to
- * @param blobClientConfig the blob client configuration
- * @throws IOException Thrown, if the file upload to the JobManager
failed.
- */
- public void uploadUserJars(
- InetSocketAddress blobServerAddress,
- Configuration blobClientConfig) throws IOException {
- if (!userJars.isEmpty()) {
- List<PermanentBlobKey> blobKeys =
BlobClient.uploadFiles(
- blobServerAddress, blobClientConfig, jobID,
userJars);
-
- for (PermanentBlobKey blobKey : blobKeys) {
- if (!userJarBlobKeys.contains(blobKey)) {
- userJarBlobKeys.add(blobKey);
- }
- }
- }
- }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
- /**
- * Configures JobGraph with user specified artifacts. If the files are
in local system it uploads them
- * to the BLOB server, otherwise it just puts metadata for future
remote access from within task executor.
- *
- * @param blobServerAddress of the blob server to upload the files to
- * @param blobClientConfig the blob client configuration
- * @throws IOException Thrown, if the file upload to the Blob server
failed.
- */
- public void uploadUserArtifacts(
- InetSocketAddress blobServerAddress,
- Configuration blobClientConfig) throws IOException {
-
- Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>>
uploadToBlobServer = new HashSet<>();
- Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>>
distributeViaDFS = new HashSet<>();
-
- for (Map.Entry<String, DistributedCache.DistributedCacheEntry>
userArtifact : userArtifacts.entrySet()) {
- Path filePath = new
Path(userArtifact.getValue().filePath);
-
- try {
- if (filePath.getFileSystem().isDistributedFS())
{
- distributeViaDFS.add(userArtifact);
- } else {
- uploadToBlobServer.add(userArtifact);
- }
-
- } catch (IOException ex) {
- distributeViaDFS.add(userArtifact);
- }
+ public void setUserArtifactBlobKey(String entryName, PermanentBlobKey
blobKey) {
+ byte[] serializedBlobKey;
+ try {
+ serializedBlobKey =
InstantiationUtil.serializeObject(blobKey);
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Could not serialize
blobkey " + blobKey + ".", e);
}
+ userArtifacts.computeIfPresent(entryName, (key, originalEntry)
-> new DistributedCache.DistributedCacheEntry(
+ originalEntry.filePath,
+ originalEntry.isExecutable,
+ serializedBlobKey,
+ originalEntry.isZipped
+ ));
+ }
- uploadViaBlob(blobServerAddress, blobClientConfig,
uploadToBlobServer);
-
- for (Map.Entry<String, DistributedCache.DistributedCacheEntry>
userArtifact : distributeViaDFS) {
+ public void finalizeUserArtifactEntries() {
--- End diff --
I think we would not need this method if we don't write the
`DistributedCacheEntries` into the configuration. If I'm not mistaken, then we
send the `userArtifacts` map anyway to the cluster. The things which are
missing are: Addind a serial version UID to the `DistributedCacheEntry`, and
adding the `userArtifacts` to the `TaskDeploymentDescriptor` to send them to
the `TaskManager`.
> Move jar/artifact upload logic out of JobGraph
> ----------------------------------------------
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
> Issue Type: Improvement
> Components: Job-Submission
> Affects Versions: 1.6.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to
> the BlobService.
> However, how these files are uploaded isn't a concern of theĀ {{JobGraph}} but
> the submission-method, like theĀ {{RestClusterClient}}.
> These methods should be moved into a utility class.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)