Repository: flink
Updated Branches:
  refs/heads/master 81839d7e3 -> dd4c8469b


[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph

This closes #6199.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd4c8469
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd4c8469
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd4c8469

Branch: refs/heads/master
Commit: dd4c8469b11184b633d2b9514b9910622734270f
Parents: 81839d7
Author: zentol <[email protected]>
Authored: Wed Jun 13 18:21:21 2018 +0200
Committer: zentol <[email protected]>
Committed: Thu Jun 28 12:21:06 2018 +0200

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  |  16 +-
 .../webmonitor/handlers/JarRunHandler.java      |  14 +-
 .../flink/runtime/client/ClientUtils.java       | 127 +++++++++++++++
 .../apache/flink/runtime/client/JobClient.java  |  16 +-
 .../client/JobSubmissionClientActor.java        |  23 +--
 .../apache/flink/runtime/jobgraph/JobGraph.java |  91 ++---------
 .../flink/runtime/minicluster/MiniCluster.java  |  53 ++-----
 .../flink/runtime/client/ClientUtilsTest.java   | 154 +++++++++++++++++++
 .../flink/runtime/jobgraph/JobGraphTest.java    |  59 ++++---
 9 files changed, 362 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8eb4ec0..85699d7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -323,17 +323,11 @@ public class RestClusterClient<T> extends 
ClusterClient<T> implements NewCluster
                        (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
                                final int blobServerPort = response.port;
                                final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-                               final List<PermanentBlobKey> keys;
-                               try {
-                                       log.info("Uploading jar files.");
-                                       keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-                                       jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-                               } catch (IOException ioe) {
-                                       throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-                               }
 
-                               for (PermanentBlobKey key : keys) {
-                                       jobGraph.addUserJarBlobKey(key);
+                               try {
+                                       
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
+                               } catch (Exception e) {
+                                       throw new CompletionException(e);
                                }
 
                                return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 0605bf1..10387c8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -25,7 +25,7 @@ import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -46,7 +46,6 @@ import akka.actor.AddressFromURIString;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -114,15 +113,10 @@ public class JarRunHandler extends
 
                CompletableFuture<JobGraph> jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
                        final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
-                       final List<PermanentBlobKey> keys;
                        try {
-                               keys = BlobClient.uploadFiles(address, 
configuration, jobGraph.getJobID(), jobGraph.getUserJars());
-                       } catch (IOException ioe) {
-                               throw new CompletionException(new 
FlinkException("Could not upload job jar files.", ioe));
-                       }
-
-                       for (PermanentBlobKey key : keys) {
-                               jobGraph.addUserJarBlobKey(key);
+                               ClientUtils.uploadJobGraphFiles(jobGraph, () -> 
new BlobClient(address, configuration));
+                       } catch (FlinkException e) {
+                               throw new CompletionException(e);
                        }
 
                        return jobGraph;

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
new file mode 100644
index 0000000..fc6a621
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+       ;
+
+       /**
+        * Uploads all files required for the execution of the given {@link 
JobGraph} using the {@link BlobClient} from
+        * the given {@link Supplier}.
+        *
+        * @param jobGraph jobgraph requiring files
+        * @param clientSupplier supplier of blob client to upload files with
+        * @throws IOException if the upload fails
+        */
+       public static void uploadJobGraphFiles(JobGraph jobGraph, 
SupplierWithException<BlobClient, IOException> clientSupplier) throws 
FlinkException {
+               List<Path> userJars = jobGraph.getUserJars();
+               Map<String, DistributedCache.DistributedCacheEntry> 
userArtifacts = jobGraph.getUserArtifacts();
+               if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
+                       try (BlobClient client = clientSupplier.get()) {
+                               uploadAndSetUserJars(jobGraph, client);
+                               uploadAndSetUserArtifacts(jobGraph, client);
+                       } catch (IOException ioe) {
+                               throw new FlinkException("Could not upload job 
files.", ioe);
+                       }
+               }
+       }
+
+       /**
+        * Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+        * and sets the appropriate blobkeys.
+        *
+        * @param jobGraph   jobgraph requiring user jars
+        * @param blobClient client to upload jars with
+        * @throws IOException if the upload fails
+        */
+       private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
+               Collection<PermanentBlobKey> blobKeys = 
uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+               setUserJarBlobKeys(blobKeys, jobGraph);
+       }
+
+       private static Collection<PermanentBlobKey> uploadUserJars(JobID jobId, 
Collection<Path> userJars, BlobClient blobClient) throws IOException {
+               Collection<PermanentBlobKey> blobKeys = new 
ArrayList<>(userJars.size());
+               for (Path jar : userJars) {
+                       final PermanentBlobKey blobKey = 
blobClient.uploadFile(jobId, jar);
+                       blobKeys.add(blobKey);
+               }
+               return blobKeys;
+       }
+
+       private static void setUserJarBlobKeys(Collection<PermanentBlobKey> 
blobKeys, JobGraph jobGraph) {
+               blobKeys.forEach(jobGraph::addUserJarBlobKey);
+       }
+
+       /**
+        * Uploads the user artifacts from the given {@link JobGraph} using the 
given {@link BlobClient},
+        * and sets the appropriate blobkeys.
+        *
+        * @param jobGraph jobgraph requiring user artifacts
+        * @param blobClient client to upload artifacts with
+        * @throws IOException if the upload fails
+        */
+       private static void uploadAndSetUserArtifacts(JobGraph jobGraph, 
BlobClient blobClient) throws IOException {
+               Collection<Tuple2<String, Path>> artifactPaths = 
jobGraph.getUserArtifacts().entrySet().stream()
+                       .map(entry -> Tuple2.of(entry.getKey(), new 
Path(entry.getValue().filePath)))
+                       .collect(Collectors.toList());
+
+               Collection<Tuple2<String, PermanentBlobKey>> blobKeys = 
uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient);
+               setUserArtifactBlobKeys(jobGraph, blobKeys);
+       }
+
+       private static Collection<Tuple2<String, PermanentBlobKey>> 
uploadUserArtifacts(JobID jobID, Collection<Tuple2<String, Path>> 
userArtifacts, BlobClient blobClient) throws IOException {
+               Collection<Tuple2<String, PermanentBlobKey>> blobKeys = new 
ArrayList<>(userArtifacts.size());
+               for (Tuple2<String, Path> userArtifact : userArtifacts) {
+                       // only upload local files
+                       if (!userArtifact.f1.getFileSystem().isDistributedFS()) 
{
+                               final PermanentBlobKey blobKey = 
blobClient.uploadFile(jobID, userArtifact.f1);
+                               blobKeys.add(Tuple2.of(userArtifact.f0, 
blobKey));
+                       }
+               }
+               return blobKeys;
+       }
+
+       private static void setUserArtifactBlobKeys(JobGraph jobGraph, 
Collection<Tuple2<String, PermanentBlobKey>> blobKeys) throws IOException {
+               for (Tuple2<String, PermanentBlobKey> blobKey : blobKeys) {
+                       jobGraph.setUserArtifactBlobKey(blobKey.f0, blobKey.f1);
+               }
+               jobGraph.writeUserArtifactEntriesToConfiguration();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 76d9bd6..27da3b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorRef;
@@ -422,18 +424,10 @@ public class JobClient {
                }
 
                try {
-                       jobGraph.uploadUserJars(blobServerAddress, config);
-               }
-               catch (IOException e) {
-                       throw new JobSubmissionException(jobGraph.getJobID(),
-                               "Could not upload the program's JAR files to 
the JobManager.", e);
-               }
-
-               try {
-                       jobGraph.uploadUserArtifacts(blobServerAddress, config);
-               } catch (IOException e) {
+                       ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(blobServerAddress, config));
+               } catch (FlinkException e) {
                        throw new JobSubmissionException(jobGraph.getJobID(),
-                                       "Could not upload custom user artifacts 
to the job manager.", e);
+                                       "Could not upload job files.", e);
                }
 
                CompletableFuture<Acknowledge> submissionFuture = 
jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 9b95633..2783b09 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -31,13 +32,13 @@ import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.Status;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -154,23 +155,9 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                final CompletableFuture<Void> jarUploadFuture = 
blobServerAddressFuture.thenAcceptAsync(
                        (InetSocketAddress blobServerAddress) -> {
                                try {
-                                       
jobGraph.uploadUserJars(blobServerAddress, clientConfig);
-                               } catch (IOException e) {
-                                       throw new CompletionException(
-                                               new JobSubmissionException(
-                                                       jobGraph.getJobID(),
-                                                       "Could not upload the 
jar files to the job manager.",
-                                                       e));
-                               }
-
-                               try {
-                                       
jobGraph.uploadUserArtifacts(blobServerAddress, clientConfig);
-                               } catch (IOException e) {
-                                       throw new CompletionException(
-                                               new JobSubmissionException(
-                                                       jobGraph.getJobID(),
-                                                       "Could not upload 
custom user artifacts to the job manager.",
-                                                       e));
+                                       
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(blobServerAddress, clientConfig));
+                               } catch (FlinkException e) {
+                                       throw new CompletionException(e);
                                }
                        },
                        getContext().dispatcher());

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 7231383..b3e03de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.InstantiationUtil;
@@ -32,12 +31,10 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -552,68 +549,25 @@ public class JobGraph implements Serializable {
                return this.userJarBlobKeys;
        }
 
-       /**
-        * Uploads the previously added user JAR files to the job manager 
through
-        * the job manager's BLOB server. The BLOB servers' address is given as 
a
-        * parameter. This function issues a blocking call.
-        *
-        * @param blobServerAddress of the blob server to upload the jars to
-        * @param blobClientConfig the blob client configuration
-        * @throws IOException Thrown, if the file upload to the JobManager 
failed.
-        */
-       public void uploadUserJars(
-                       InetSocketAddress blobServerAddress,
-                       Configuration blobClientConfig) throws IOException {
-               if (!userJars.isEmpty()) {
-                       List<PermanentBlobKey> blobKeys = 
BlobClient.uploadFiles(
-                               blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-                       for (PermanentBlobKey blobKey : blobKeys) {
-                               if (!userJarBlobKeys.contains(blobKey)) {
-                                       userJarBlobKeys.add(blobKey);
-                               }
-                       }
-               }
-       }
-
        @Override
        public String toString() {
                return "JobGraph(jobId: " + jobID + ")";
        }
 
-       /**
-        * Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-        * to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-        *
-        * @param blobServerAddress of the blob server to upload the files to
-        * @param blobClientConfig the blob client configuration
-        * @throws IOException Thrown, if the file upload to the Blob server 
failed.
-        */
-       public void uploadUserArtifacts(
-                       InetSocketAddress blobServerAddress,
-                       Configuration blobClientConfig) throws IOException {
+       public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) throws IOException {
+               byte[] serializedBlobKey;
+               serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
 
-               Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> 
uploadToBlobServer = new HashSet<>();
-               Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> 
distributeViaDFS = new HashSet<>();
+               userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+                       originalEntry.filePath,
+                       originalEntry.isExecutable,
+                       serializedBlobKey,
+                       originalEntry.isZipped
+               ));
+       }
 
+       public void writeUserArtifactEntriesToConfiguration() {
                for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
userArtifact : userArtifacts.entrySet()) {
-                       Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-                       try {
-                               if (filePath.getFileSystem().isDistributedFS()) 
{
-                                       distributeViaDFS.add(userArtifact);
-                               } else {
-                                       uploadToBlobServer.add(userArtifact);
-                               }
-
-                       } catch (IOException ex) {
-                               distributeViaDFS.add(userArtifact);
-                       }
-               }
-
-               uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-               for (Map.Entry<String, DistributedCache.DistributedCacheEntry> 
userArtifact : distributeViaDFS) {
                        DistributedCache.writeFileInfoToConfig(
                                userArtifact.getKey(),
                                userArtifact.getValue(),
@@ -621,27 +575,4 @@ public class JobGraph implements Serializable {
                        );
                }
        }
-
-       private void uploadViaBlob(
-                       InetSocketAddress blobServerAddress,
-                       Configuration clientConfig,
-                       Set<Map.Entry<String, 
DistributedCache.DistributedCacheEntry>> uploadToBlobServer) throws IOException 
{
-               if (!uploadToBlobServer.isEmpty()) {
-                       try (BlobClient blobClient = new 
BlobClient(blobServerAddress, clientConfig)) {
-                               for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> userArtifact : uploadToBlobServer) {
-                                       final PermanentBlobKey key = 
blobClient.uploadFile(jobID,
-                                               new 
Path(userArtifact.getValue().filePath));
-
-                                       DistributedCache.writeFileInfoToConfig(
-                                               userArtifact.getKey(),
-                                               new 
DistributedCache.DistributedCacheEntry(
-                                                       
userArtifact.getValue().filePath,
-                                                       
userArtifact.getValue().isExecutable,
-                                                       
InstantiationUtil.serializeObject(key),
-                                                       
userArtifact.getValue().isZipped),
-                                               jobConfiguration);
-                               }
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b89617b..4fab2b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,12 +25,11 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobServer;
-import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -90,7 +89,6 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -583,8 +581,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        public void runDetached(JobGraph job) throws JobExecutionException, 
InterruptedException {
                checkNotNull(job, "job is null");
 
-               uploadUserArtifacts(job);
-
                final CompletableFuture<JobSubmissionResult> submissionFuture = 
submitJob(job);
 
                try {
@@ -608,7 +604,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        public JobExecutionResult executeJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
                checkNotNull(job, "job is null");
 
-               uploadUserArtifacts(job);
                final CompletableFuture<JobSubmissionResult> submissionFuture = 
submitJob(job);
 
                final CompletableFuture<JobResult> jobResultFuture = 
submissionFuture.thenCompose(
@@ -631,15 +626,6 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
-       private void uploadUserArtifacts(JobGraph job) throws 
JobExecutionException {
-               try {
-                       final InetSocketAddress blobAddress = new 
InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort());
-                       job.uploadUserArtifacts(blobAddress, 
miniClusterConfiguration.getConfiguration());
-               } catch (IOException e) {
-                       throw new JobExecutionException(job.getJobID(), "Could 
not upload user artifacts", e);
-               }
-       }
-
        public CompletableFuture<JobSubmissionResult> submitJob(JobGraph 
jobGraph) {
                final DispatcherGateway dispatcherGateway;
                try {
@@ -653,7 +639,9 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                // from the ResourceManager
                jobGraph.setAllowQueuedScheduling(true);
 
-               final CompletableFuture<Void> jarUploadFuture = 
uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+               final CompletableFuture<InetSocketAddress> 
blobServerAddressFuture = createBlobServerAddress(dispatcherGateway);
+
+               final CompletableFuture<Void> jarUploadFuture = 
uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
 
                final CompletableFuture<Acknowledge> 
acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
                        (Void ack) -> dispatcherGateway.submitJob(jobGraph, 
rpcTimeout));
@@ -685,32 +673,19 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
-       private CompletableFuture<Void> uploadAndSetJarFiles(final 
DispatcherGateway currentDispatcherGateway, final JobGraph job) {
-               List<Path> userJars = job.getUserJars();
-               if (!userJars.isEmpty()) {
-                       CompletableFuture<List<PermanentBlobKey>> 
jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), 
job.getUserJars());
-                       return jarUploadFuture.thenAccept(blobKeys -> {
-                                       for (PermanentBlobKey blobKey : 
blobKeys) {
-                                               job.addUserJarBlobKey(blobKey);
-                                       }
-                               });
-               } else {
-                       LOG.debug("No jars to upload for job {}.", 
job.getJobID());
-                       return CompletableFuture.completedFuture(null);
-               }
+       private CompletableFuture<Void> uploadAndSetJobFiles(final 
CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph 
job) {
+               return blobServerAddressFuture.thenAccept(blobServerAddress -> {
+                       try {
+                               ClientUtils.uploadJobGraphFiles(job, () -> new 
BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
+                       } catch (FlinkException e) {
+                               throw new CompletionException(e);
+                       }
+               });
        }
 
-       private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final 
DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> 
jars) {
+       private CompletableFuture<InetSocketAddress> 
createBlobServerAddress(final DispatcherGateway currentDispatcherGateway) {
                return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
-                       .thenApply(blobServerPort -> {
-                               InetSocketAddress blobServerAddress = new 
InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
-
-                               try {
-                                       return 
BlobClient.uploadFiles(blobServerAddress, 
miniClusterConfiguration.getConfiguration(), jobId, jars);
-                               } catch (IOException ioe) {
-                                       throw new CompletionException(new 
FlinkException("Could not upload job jar files.", ioe));
-                               }
-                       });
+                       .thenApply(blobServerPort -> new 
InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort));
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
new file mode 100644
index 0000000..dc14cb1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ClientUtils}.
+ */
+public class ClientUtilsTest extends TestLogger {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static BlobServer blobServer = null;
+
+       @BeforeClass
+       public static void setup() throws IOException {
+               Configuration config = new Configuration();
+               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+                       temporaryFolder.newFolder().getAbsolutePath());
+               blobServer = new BlobServer(config, new VoidBlobStore());
+               blobServer.start();
+       }
+
+       @AfterClass
+       public static void teardown() throws IOException {
+               if (blobServer != null) {
+                       blobServer.close();
+               }
+       }
+
+       @Test
+       public void uploadAndSetUserJars() throws Exception {
+               java.nio.file.Path tmpDir = 
temporaryFolder.newFolder().toPath();
+               JobGraph jobGraph = new JobGraph();
+
+               Collection<Path> jars = Arrays.asList(
+                       new 
Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()),
+                       new 
Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString()));
+
+               jars.forEach(jobGraph::addJar);
+
+               assertEquals(jars.size(), jobGraph.getUserJars().size());
+               assertEquals(0, jobGraph.getUserJarBlobKeys().size());
+
+               ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new 
Configuration()));
+
+               assertEquals(jars.size(), jobGraph.getUserJars().size());
+               assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
+               assertEquals(jars.size(), 
jobGraph.getUserJarBlobKeys().stream().distinct().count());
+
+               for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
+                       blobServer.getFile(jobGraph.getJobID(), blobKey);
+               }
+       }
+
+       @Test
+       public void uploadAndSetUserArtifacts() throws Exception {
+               java.nio.file.Path tmpDir = 
temporaryFolder.newFolder().toPath();
+               JobGraph jobGraph = new JobGraph();
+
+               Collection<DistributedCache.DistributedCacheEntry> 
localArtifacts = Arrays.asList(
+                       new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art1")).toString(),
 true, true),
+                       new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art2")).toString(),
 true, false),
+                       new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art3")).toString(),
 false, true),
+                       new 
DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art4")).toString(),
 true, false)
+               );
+
+               Collection<DistributedCache.DistributedCacheEntry> 
distributedArtifacts = Arrays.asList(
+                       new 
DistributedCache.DistributedCacheEntry("hdfs://localhost:1234/test", true, 
false)
+               );
+
+               for (DistributedCache.DistributedCacheEntry entry : 
localArtifacts) {
+                       jobGraph.addUserArtifact(entry.filePath, entry);
+               }
+               for (DistributedCache.DistributedCacheEntry entry : 
distributedArtifacts) {
+                       jobGraph.addUserArtifact(entry.filePath, entry);
+               }
+
+               final int totalNumArtifacts = localArtifacts.size() + 
distributedArtifacts.size();
+
+               assertEquals(totalNumArtifacts, 
jobGraph.getUserArtifacts().size());
+               assertEquals(0, 
jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != 
null).count());
+
+               ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new 
Configuration()));
+
+               assertEquals(totalNumArtifacts, 
jobGraph.getUserArtifacts().size());
+               assertEquals(localArtifacts.size(), 
jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != 
null).count());
+               assertEquals(distributedArtifacts.size(), 
jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey == 
null).count());
+               // 1 unique key for each local artifact, and null for 
distributed artifacts
+               assertEquals(localArtifacts.size() + 1, 
jobGraph.getUserArtifacts().values().stream().map(entry -> 
entry.blobKey).distinct().count());
+               for (DistributedCache.DistributedCacheEntry original : 
localArtifacts) {
+                       assertState(original, 
jobGraph.getUserArtifacts().get(original.filePath), false, jobGraph.getJobID());
+               }
+               for (DistributedCache.DistributedCacheEntry original : 
distributedArtifacts) {
+                       assertState(original, 
jobGraph.getUserArtifacts().get(original.filePath), true, jobGraph.getJobID());
+               }
+       }
+
+       private static void assertState(DistributedCache.DistributedCacheEntry 
original, DistributedCache.DistributedCacheEntry actual, boolean isBlobKeyNull, 
JobID jobId) throws Exception {
+               assertEquals(original.isZipped, actual.isZipped);
+               assertEquals(original.isExecutable, actual.isExecutable);
+               assertEquals(original.filePath, actual.filePath);
+               assertEquals(isBlobKeyNull, actual.blobKey == null);
+               if (!isBlobKeyNull) {
+                       blobServer.getFile(
+                               jobId,
+                               
InstantiationUtil.<PermanentBlobKey>deserializeObject(actual.blobKey, 
ClientUtilsTest.class.getClassLoader()));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
index 227f6e3..160402b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import static org.junit.Assert.*;
-
-import java.net.InetSocketAddress;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
 public class JobGraphTest extends TestLogger {
 
        @Test
@@ -263,22 +266,6 @@ public class JobGraphTest extends TestLogger {
                        fail(e.getMessage());
                }
        }
-
-       @Test
-       public void testConfiguringDistributedCache() throws Exception {
-               JobGraph testJob = new JobGraph("Test job");
-               testJob.addUserArtifact("dfsFile", new 
DistributedCache.DistributedCacheEntry("hdfs://tmp/file", false));
-
-               //it should never try to connect to that address
-               testJob.uploadUserArtifacts(new InetSocketAddress("localhost", 
1111), new Configuration());
-
-               Configuration jobConfiguration = testJob.getJobConfiguration();
-               assertEquals(1, 
jobConfiguration.getInteger("DISTRIBUTED_CACHE_FILE_NUM", -1));
-               
assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_DIR_1", true));
-               assertEquals("dfsFile", 
jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_NAME_1", ""));
-               assertEquals("hdfs://tmp/file", 
jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_PATH_1", ""));
-               
assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_EXE_1", true));
-       }
        
        private static final void assertBefore(JobVertex v1, JobVertex v2, 
List<JobVertex> list) {
                boolean seenFirst = false;
@@ -294,4 +281,32 @@ public class JobGraphTest extends TestLogger {
                        }
                }
        }
+
+       @Test
+       public void testSetUserArtifactBlobKey() throws IOException, 
ClassNotFoundException {
+               JobGraph jb = new JobGraph();
+
+               final DistributedCache.DistributedCacheEntry[] entries = {
+                       new DistributedCache.DistributedCacheEntry("p1", true, 
true),
+                       new DistributedCache.DistributedCacheEntry("p2", true, 
false),
+                       new DistributedCache.DistributedCacheEntry("p3", false, 
true),
+                       new DistributedCache.DistributedCacheEntry("p4", true, 
false),
+               };
+
+               for (DistributedCache.DistributedCacheEntry entry : entries) {
+                       jb.addUserArtifact(entry.filePath, entry);
+               }
+
+               for (DistributedCache.DistributedCacheEntry entry : entries) {
+                       PermanentBlobKey blobKey = new PermanentBlobKey();
+                       jb.setUserArtifactBlobKey(entry.filePath, blobKey);
+
+                       DistributedCache.DistributedCacheEntry jobGraphEntry = 
jb.getUserArtifacts().get(entry.filePath);
+                       assertNotNull(jobGraphEntry);
+                       assertEquals(blobKey, 
InstantiationUtil.deserializeObject(jobGraphEntry.blobKey, 
ClassLoader.getSystemClassLoader(), false));
+                       assertEquals(entry.isExecutable, 
jobGraphEntry.isExecutable);
+                       assertEquals(entry.isZipped, jobGraphEntry.isZipped);
+                       assertEquals(entry.filePath, jobGraphEntry.filePath);
+               }
+       }
 }

Reply via email to