[FLINK-8966][tests] Upload user-jars in MiniCluster

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3947a395
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3947a395
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3947a395

Branch: refs/heads/master
Commit: 3947a395ecfe37de97be346416493d933cd3fe5a
Parents: 2426f78
Author: zentol <ches...@apache.org>
Authored: Wed Mar 14 12:49:11 2018 +0100
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 4 08:59:30 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/minicluster/MiniCluster.java  | 38 +++++++++++++++++++-
 1 file changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3947a395/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 2e826eb..64d46c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
@@ -86,8 +89,10 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
@@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                // from the ResourceManager
                jobGraph.setAllowQueuedScheduling(true);
 
-               final CompletableFuture<Acknowledge> 
acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, 
rpcTimeout);
+               final CompletableFuture<Void> jarUploadFuture = 
uploadAndSetJarFiles(dispatcherGateway, jobGraph);
+
+               final CompletableFuture<Acknowledge> 
acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
+                       (Void ack) -> dispatcherGateway.submitJob(jobGraph, 
rpcTimeout));
 
                return acknowledgeCompletableFuture.thenApply(
                        (Acknowledge ignored) -> new 
JobSubmissionResult(jobGraph.getJobID()));
@@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
                }
        }
 
+       private CompletableFuture<Void> uploadAndSetJarFiles(final 
DispatcherGateway currentDispatcherGateway, final JobGraph job) {
+               List<Path> userJars = job.getUserJars();
+               if (!userJars.isEmpty()) {
+                       CompletableFuture<List<PermanentBlobKey>> 
jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), 
job.getUserJars());
+                       return jarUploadFuture.thenAccept(blobKeys -> {
+                                       for (PermanentBlobKey blobKey : 
blobKeys) {
+                                               job.addBlob(blobKey);
+                                       }
+                               });
+               } else {
+                       LOG.debug("No jars to upload for job {}.", 
job.getJobID());
+                       return CompletableFuture.completedFuture(null);
+               }
+       }
+
+       private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final 
DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> 
jars) {
+               return currentDispatcherGateway.getBlobServerPort(rpcTimeout)
+                       .thenApply(blobServerPort -> {
+                               InetSocketAddress blobServerAddress = new 
InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort);
+
+                               try {
+                                       return 
BlobClient.uploadJarFiles(blobServerAddress, 
miniClusterConfiguration.getConfiguration(), jobId, jars);
+                               } catch (IOException ioe) {
+                                       throw new CompletionException(new 
FlinkException("Could not upload job jar files.", ioe));
+                               }
+                       });
+       }
+
        // 
------------------------------------------------------------------------
        //  factories - can be overridden by subclasses to alter behavior
        // 
------------------------------------------------------------------------

Reply via email to