[FLINK-8966][tests] Upload user-jars in MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3947a395 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3947a395 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3947a395 Branch: refs/heads/master Commit: 3947a395ecfe37de97be346416493d933cd3fe5a Parents: 2426f78 Author: zentol <ches...@apache.org> Authored: Wed Mar 14 12:49:11 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Apr 4 08:59:30 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/minicluster/MiniCluster.java | 38 +++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3947a395/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 2e826eb..64d46c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -25,9 +25,12 @@ import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; @@ -86,8 +89,10 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -632,7 +637,10 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); - final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = dispatcherGateway.submitJob(jobGraph, rpcTimeout); + final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph); + + final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose( + (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)); return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); @@ -661,6 +669,34 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } + private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) { + List<Path> userJars = job.getUserJars(); + if (!userJars.isEmpty()) { + CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars()); + return jarUploadFuture.thenAccept(blobKeys -> { + for (PermanentBlobKey blobKey : blobKeys) { + job.addBlob(blobKey); + } + }); + } else { + LOG.debug("No jars to upload for job {}.", job.getJobID()); + return CompletableFuture.completedFuture(null); + } + } + + private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) { + return currentDispatcherGateway.getBlobServerPort(rpcTimeout) + .thenApply(blobServerPort -> { + InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort); + + try { + return BlobClient.uploadJarFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars); + } catch (IOException ioe) { + throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe)); + } + }); + } + // ------------------------------------------------------------------------ // factories - can be overridden by subclasses to alter behavior // ------------------------------------------------------------------------