Repository: flink Updated Branches: refs/heads/master 81839d7e3 -> dd4c8469b
[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph This closes #6199. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd4c8469 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd4c8469 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd4c8469 Branch: refs/heads/master Commit: dd4c8469b11184b633d2b9514b9910622734270f Parents: 81839d7 Author: zentol <[email protected]> Authored: Wed Jun 13 18:21:21 2018 +0200 Committer: zentol <[email protected]> Committed: Thu Jun 28 12:21:06 2018 +0200 ---------------------------------------------------------------------- .../client/program/rest/RestClusterClient.java | 16 +- .../webmonitor/handlers/JarRunHandler.java | 14 +- .../flink/runtime/client/ClientUtils.java | 127 +++++++++++++++ .../apache/flink/runtime/client/JobClient.java | 16 +- .../client/JobSubmissionClientActor.java | 23 +-- .../apache/flink/runtime/jobgraph/JobGraph.java | 91 ++--------- .../flink/runtime/minicluster/MiniCluster.java | 53 ++----- .../flink/runtime/client/ClientUtilsTest.java | 154 +++++++++++++++++++ .../flink/runtime/jobgraph/JobGraphTest.java | 59 ++++--- 9 files changed, 362 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 8eb4ec0..85699d7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -30,7 +30,7 @@ import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy; import org.apache.flink.client.program.rest.retry.WaitStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobClient; -import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.client.ClientUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; @@ -323,17 +323,11 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster (BlobServerPortResponseBody response, String dispatcherAddress) -> { final int blobServerPort = response.port; final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); - final List<PermanentBlobKey> keys; - try { - log.info("Uploading jar files."); - keys = BlobClient.uploadFiles(address, flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars()); - jobGraph.uploadUserArtifacts(address, flinkConfig); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job files.", ioe)); - } - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); + try { + ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); + } catch (Exception e) { + throw new CompletionException(e); } return jobGraph; http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 0605bf1..10387c8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -25,7 +25,7 @@ import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobClient; -import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.client.ClientUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -46,7 +46,6 @@ import akka.actor.AddressFromURIString; import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; @@ -114,15 +113,10 @@ public class JarRunHandler extends CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); - final List<PermanentBlobKey> keys; try { - keys = BlobClient.uploadFiles(address, configuration, jobGraph.getJobID(), jobGraph.getUserJars()); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe)); - } - - for (PermanentBlobKey key : keys) { - jobGraph.addUserJarBlobKey(key); + ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration)); + } catch (FlinkException e) { + throw new CompletionException(e); } return jobGraph; http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java new file mode 100644 index 0000000..fc6a621 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.SupplierWithException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Contains utility methods for clients. + */ +public enum ClientUtils { + ; + + /** + * Uploads all files required for the execution of the given {@link JobGraph} using the {@link BlobClient} from + * the given {@link Supplier}. + * + * @param jobGraph jobgraph requiring files + * @param clientSupplier supplier of blob client to upload files with + * @throws IOException if the upload fails + */ + public static void uploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException { + List<Path> userJars = jobGraph.getUserJars(); + Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); + if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { + try (BlobClient client = clientSupplier.get()) { + uploadAndSetUserJars(jobGraph, client); + uploadAndSetUserArtifacts(jobGraph, client); + } catch (IOException ioe) { + throw new FlinkException("Could not upload job files.", ioe); + } + } + } + + /** + * Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient}, + * and sets the appropriate blobkeys. + * + * @param jobGraph jobgraph requiring user jars + * @param blobClient client to upload jars with + * @throws IOException if the upload fails + */ + private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { + Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient); + setUserJarBlobKeys(blobKeys, jobGraph); + } + + private static Collection<PermanentBlobKey> uploadUserJars(JobID jobId, Collection<Path> userJars, BlobClient blobClient) throws IOException { + Collection<PermanentBlobKey> blobKeys = new ArrayList<>(userJars.size()); + for (Path jar : userJars) { + final PermanentBlobKey blobKey = blobClient.uploadFile(jobId, jar); + blobKeys.add(blobKey); + } + return blobKeys; + } + + private static void setUserJarBlobKeys(Collection<PermanentBlobKey> blobKeys, JobGraph jobGraph) { + blobKeys.forEach(jobGraph::addUserJarBlobKey); + } + + /** + * Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient}, + * and sets the appropriate blobkeys. + * + * @param jobGraph jobgraph requiring user artifacts + * @param blobClient client to upload artifacts with + * @throws IOException if the upload fails + */ + private static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException { + Collection<Tuple2<String, Path>> artifactPaths = jobGraph.getUserArtifacts().entrySet().stream() + .map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath))) + .collect(Collectors.toList()); + + Collection<Tuple2<String, PermanentBlobKey>> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient); + setUserArtifactBlobKeys(jobGraph, blobKeys); + } + + private static Collection<Tuple2<String, PermanentBlobKey>> uploadUserArtifacts(JobID jobID, Collection<Tuple2<String, Path>> userArtifacts, BlobClient blobClient) throws IOException { + Collection<Tuple2<String, PermanentBlobKey>> blobKeys = new ArrayList<>(userArtifacts.size()); + for (Tuple2<String, Path> userArtifact : userArtifacts) { + // only upload local files + if (!userArtifact.f1.getFileSystem().isDistributedFS()) { + final PermanentBlobKey blobKey = blobClient.uploadFile(jobID, userArtifact.f1); + blobKeys.add(Tuple2.of(userArtifact.f0, blobKey)); + } + } + return blobKeys; + } + + private static void setUserArtifactBlobKeys(JobGraph jobGraph, Collection<Tuple2<String, PermanentBlobKey>> blobKeys) throws IOException { + for (Tuple2<String, PermanentBlobKey> blobKey : blobKeys) { + jobGraph.setUserArtifactBlobKey(blobKey.f0, blobKey.f1); + } + jobGraph.writeUserArtifactEntriesToConfiguration(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 76d9bd6..27da3b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.PermanentBlobCache; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedThrowable; import akka.actor.ActorRef; @@ -422,18 +424,10 @@ public class JobClient { } try { - jobGraph.uploadUserJars(blobServerAddress, config); - } - catch (IOException e) { - throw new JobSubmissionException(jobGraph.getJobID(), - "Could not upload the program's JAR files to the JobManager.", e); - } - - try { - jobGraph.uploadUserArtifacts(blobServerAddress, config); - } catch (IOException e) { + ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config)); + } catch (FlinkException e) { throw new JobSubmissionException(jobGraph.getJobID(), - "Could not upload custom user artifacts to the job manager.", e); + "Could not upload job files.", e); } CompletableFuture<Acknowledge> submissionFuture = jobManagerGateway.submitJob(jobGraph, ListeningBehaviour.DETACHED, timeout); http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java index 9b95633..2783b09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaJobManagerGateway; import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -31,13 +32,13 @@ import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobClientMessages.SubmitJobAndWait; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedThrowable; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; -import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -154,23 +155,9 @@ public class JobSubmissionClientActor extends JobClientActor { final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync( (InetSocketAddress blobServerAddress) -> { try { - jobGraph.uploadUserJars(blobServerAddress, clientConfig); - } catch (IOException e) { - throw new CompletionException( - new JobSubmissionException( - jobGraph.getJobID(), - "Could not upload the jar files to the job manager.", - e)); - } - - try { - jobGraph.uploadUserArtifacts(blobServerAddress, clientConfig); - } catch (IOException e) { - throw new CompletionException( - new JobSubmissionException( - jobGraph.getJobID(), - "Could not upload custom user artifacts to the job manager.", - e)); + ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig)); + } catch (FlinkException e) { + throw new CompletionException(e); } }, getContext().dispatcher()); http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 7231383..b3e03de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.util.InstantiationUtil; @@ -32,12 +31,10 @@ import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.io.Serializable; -import java.net.InetSocketAddress; import java.net.URL; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -552,68 +549,25 @@ public class JobGraph implements Serializable { return this.userJarBlobKeys; } - /** - * Uploads the previously added user JAR files to the job manager through - * the job manager's BLOB server. The BLOB servers' address is given as a - * parameter. This function issues a blocking call. - * - * @param blobServerAddress of the blob server to upload the jars to - * @param blobClientConfig the blob client configuration - * @throws IOException Thrown, if the file upload to the JobManager failed. - */ - public void uploadUserJars( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { - if (!userJars.isEmpty()) { - List<PermanentBlobKey> blobKeys = BlobClient.uploadFiles( - blobServerAddress, blobClientConfig, jobID, userJars); - - for (PermanentBlobKey blobKey : blobKeys) { - if (!userJarBlobKeys.contains(blobKey)) { - userJarBlobKeys.add(blobKey); - } - } - } - } - @Override public String toString() { return "JobGraph(jobId: " + jobID + ")"; } - /** - * Configures JobGraph with user specified artifacts. If the files are in local system it uploads them - * to the BLOB server, otherwise it just puts metadata for future remote access from within task executor. - * - * @param blobServerAddress of the blob server to upload the files to - * @param blobClientConfig the blob client configuration - * @throws IOException Thrown, if the file upload to the Blob server failed. - */ - public void uploadUserArtifacts( - InetSocketAddress blobServerAddress, - Configuration blobClientConfig) throws IOException { + public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) throws IOException { + byte[] serializedBlobKey; + serializedBlobKey = InstantiationUtil.serializeObject(blobKey); - Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer = new HashSet<>(); - Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> distributeViaDFS = new HashSet<>(); + userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry( + originalEntry.filePath, + originalEntry.isExecutable, + serializedBlobKey, + originalEntry.isZipped + )); + } + public void writeUserArtifactEntriesToConfiguration() { for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) { - Path filePath = new Path(userArtifact.getValue().filePath); - - try { - if (filePath.getFileSystem().isDistributedFS()) { - distributeViaDFS.add(userArtifact); - } else { - uploadToBlobServer.add(userArtifact); - } - - } catch (IOException ex) { - distributeViaDFS.add(userArtifact); - } - } - - uploadViaBlob(blobServerAddress, blobClientConfig, uploadToBlobServer); - - for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : distributeViaDFS) { DistributedCache.writeFileInfoToConfig( userArtifact.getKey(), userArtifact.getValue(), @@ -621,27 +575,4 @@ public class JobGraph implements Serializable { ); } } - - private void uploadViaBlob( - InetSocketAddress blobServerAddress, - Configuration clientConfig, - Set<Map.Entry<String, DistributedCache.DistributedCacheEntry>> uploadToBlobServer) throws IOException { - if (!uploadToBlobServer.isEmpty()) { - try (BlobClient blobClient = new BlobClient(blobServerAddress, clientConfig)) { - for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : uploadToBlobServer) { - final PermanentBlobKey key = blobClient.uploadFile(jobID, - new Path(userArtifact.getValue().filePath)); - - DistributedCache.writeFileInfoToConfig( - userArtifact.getKey(), - new DistributedCache.DistributedCacheEntry( - userArtifact.getValue().filePath, - userArtifact.getValue().isExecutable, - InstantiationUtil.serializeObject(key), - userArtifact.getValue().isZipped), - jobConfiguration); - } - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index b89617b..4fab2b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -25,12 +25,11 @@ import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.client.ClientUtils; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; @@ -90,7 +89,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.util.ArrayList; import java.util.Collection; -import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -583,8 +581,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { public void runDetached(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job, "job is null"); - uploadUserArtifacts(job); - final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job); try { @@ -608,7 +604,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException { checkNotNull(job, "job is null"); - uploadUserArtifacts(job); final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job); final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose( @@ -631,15 +626,6 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } - private void uploadUserArtifacts(JobGraph job) throws JobExecutionException { - try { - final InetSocketAddress blobAddress = new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()); - job.uploadUserArtifacts(blobAddress, miniClusterConfiguration.getConfiguration()); - } catch (IOException e) { - throw new JobExecutionException(job.getJobID(), "Could not upload user artifacts", e); - } - } - public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) { final DispatcherGateway dispatcherGateway; try { @@ -653,7 +639,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { // from the ResourceManager jobGraph.setAllowQueuedScheduling(true); - final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph); + final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGateway); + + final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose( (Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)); @@ -685,32 +673,19 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { } } - private CompletableFuture<Void> uploadAndSetJarFiles(final DispatcherGateway currentDispatcherGateway, final JobGraph job) { - List<Path> userJars = job.getUserJars(); - if (!userJars.isEmpty()) { - CompletableFuture<List<PermanentBlobKey>> jarUploadFuture = uploadJarFiles(currentDispatcherGateway, job.getJobID(), job.getUserJars()); - return jarUploadFuture.thenAccept(blobKeys -> { - for (PermanentBlobKey blobKey : blobKeys) { - job.addUserJarBlobKey(blobKey); - } - }); - } else { - LOG.debug("No jars to upload for job {}.", job.getJobID()); - return CompletableFuture.completedFuture(null); - } + private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) { + return blobServerAddressFuture.thenAccept(blobServerAddress -> { + try { + ClientUtils.uploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration())); + } catch (FlinkException e) { + throw new CompletionException(e); + } + }); } - private CompletableFuture<List<PermanentBlobKey>> uploadJarFiles(final DispatcherGateway currentDispatcherGateway, final JobID jobId, final List<Path> jars) { + private CompletableFuture<InetSocketAddress> createBlobServerAddress(final DispatcherGateway currentDispatcherGateway) { return currentDispatcherGateway.getBlobServerPort(rpcTimeout) - .thenApply(blobServerPort -> { - InetSocketAddress blobServerAddress = new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort); - - try { - return BlobClient.uploadFiles(blobServerAddress, miniClusterConfiguration.getConfiguration(), jobId, jars); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not upload job jar files.", ioe)); - } - }); + .thenApply(blobServerPort -> new InetSocketAddress(currentDispatcherGateway.getHostname(), blobServerPort)); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java new file mode 100644 index 0000000..dc14cb1 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.client; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.VoidBlobStore; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link ClientUtils}. + */ +public class ClientUtilsTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static BlobServer blobServer = null; + + @BeforeClass + public static void setup() throws IOException { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + blobServer = new BlobServer(config, new VoidBlobStore()); + blobServer.start(); + } + + @AfterClass + public static void teardown() throws IOException { + if (blobServer != null) { + blobServer.close(); + } + } + + @Test + public void uploadAndSetUserJars() throws Exception { + java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath(); + JobGraph jobGraph = new JobGraph(); + + Collection<Path> jars = Arrays.asList( + new Path(Files.createFile(tmpDir.resolve("jar1.jar")).toString()), + new Path(Files.createFile(tmpDir.resolve("jar2.jar")).toString())); + + jars.forEach(jobGraph::addJar); + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(0, jobGraph.getUserJarBlobKeys().size()); + + ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())); + + assertEquals(jars.size(), jobGraph.getUserJars().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size()); + assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count()); + + for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) { + blobServer.getFile(jobGraph.getJobID(), blobKey); + } + } + + @Test + public void uploadAndSetUserArtifacts() throws Exception { + java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath(); + JobGraph jobGraph = new JobGraph(); + + Collection<DistributedCache.DistributedCacheEntry> localArtifacts = Arrays.asList( + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art1")).toString(), true, true), + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art2")).toString(), true, false), + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art3")).toString(), false, true), + new DistributedCache.DistributedCacheEntry(Files.createFile(tmpDir.resolve("art4")).toString(), true, false) + ); + + Collection<DistributedCache.DistributedCacheEntry> distributedArtifacts = Arrays.asList( + new DistributedCache.DistributedCacheEntry("hdfs://localhost:1234/test", true, false) + ); + + for (DistributedCache.DistributedCacheEntry entry : localArtifacts) { + jobGraph.addUserArtifact(entry.filePath, entry); + } + for (DistributedCache.DistributedCacheEntry entry : distributedArtifacts) { + jobGraph.addUserArtifact(entry.filePath, entry); + } + + final int totalNumArtifacts = localArtifacts.size() + distributedArtifacts.size(); + + assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size()); + assertEquals(0, jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count()); + + ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())); + + assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size()); + assertEquals(localArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count()); + assertEquals(distributedArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey == null).count()); + // 1 unique key for each local artifact, and null for distributed artifacts + assertEquals(localArtifacts.size() + 1, jobGraph.getUserArtifacts().values().stream().map(entry -> entry.blobKey).distinct().count()); + for (DistributedCache.DistributedCacheEntry original : localArtifacts) { + assertState(original, jobGraph.getUserArtifacts().get(original.filePath), false, jobGraph.getJobID()); + } + for (DistributedCache.DistributedCacheEntry original : distributedArtifacts) { + assertState(original, jobGraph.getUserArtifacts().get(original.filePath), true, jobGraph.getJobID()); + } + } + + private static void assertState(DistributedCache.DistributedCacheEntry original, DistributedCache.DistributedCacheEntry actual, boolean isBlobKeyNull, JobID jobId) throws Exception { + assertEquals(original.isZipped, actual.isZipped); + assertEquals(original.isExecutable, actual.isExecutable); + assertEquals(original.filePath, actual.filePath); + assertEquals(isBlobKeyNull, actual.blobKey == null); + if (!isBlobKeyNull) { + blobServer.getFile( + jobId, + InstantiationUtil.<PermanentBlobKey>deserializeObject(actual.blobKey, ClientUtilsTest.class.getClassLoader())); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/dd4c8469/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index 227f6e3..160402b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -18,20 +18,23 @@ package org.apache.flink.runtime.jobgraph; -import static org.junit.Assert.*; - -import java.net.InetSocketAddress; -import java.util.List; - import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.cache.DistributedCache; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.junit.Test; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + public class JobGraphTest extends TestLogger { @Test @@ -263,22 +266,6 @@ public class JobGraphTest extends TestLogger { fail(e.getMessage()); } } - - @Test - public void testConfiguringDistributedCache() throws Exception { - JobGraph testJob = new JobGraph("Test job"); - testJob.addUserArtifact("dfsFile", new DistributedCache.DistributedCacheEntry("hdfs://tmp/file", false)); - - //it should never try to connect to that address - testJob.uploadUserArtifacts(new InetSocketAddress("localhost", 1111), new Configuration()); - - Configuration jobConfiguration = testJob.getJobConfiguration(); - assertEquals(1, jobConfiguration.getInteger("DISTRIBUTED_CACHE_FILE_NUM", -1)); - assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_DIR_1", true)); - assertEquals("dfsFile", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_NAME_1", "")); - assertEquals("hdfs://tmp/file", jobConfiguration.getString("DISTRIBUTED_CACHE_FILE_PATH_1", "")); - assertFalse(jobConfiguration.getBoolean("DISTRIBUTED_CACHE_FILE_EXE_1", true)); - } private static final void assertBefore(JobVertex v1, JobVertex v2, List<JobVertex> list) { boolean seenFirst = false; @@ -294,4 +281,32 @@ public class JobGraphTest extends TestLogger { } } } + + @Test + public void testSetUserArtifactBlobKey() throws IOException, ClassNotFoundException { + JobGraph jb = new JobGraph(); + + final DistributedCache.DistributedCacheEntry[] entries = { + new DistributedCache.DistributedCacheEntry("p1", true, true), + new DistributedCache.DistributedCacheEntry("p2", true, false), + new DistributedCache.DistributedCacheEntry("p3", false, true), + new DistributedCache.DistributedCacheEntry("p4", true, false), + }; + + for (DistributedCache.DistributedCacheEntry entry : entries) { + jb.addUserArtifact(entry.filePath, entry); + } + + for (DistributedCache.DistributedCacheEntry entry : entries) { + PermanentBlobKey blobKey = new PermanentBlobKey(); + jb.setUserArtifactBlobKey(entry.filePath, blobKey); + + DistributedCache.DistributedCacheEntry jobGraphEntry = jb.getUserArtifacts().get(entry.filePath); + assertNotNull(jobGraphEntry); + assertEquals(blobKey, InstantiationUtil.deserializeObject(jobGraphEntry.blobKey, ClassLoader.getSystemClassLoader(), false)); + assertEquals(entry.isExecutable, jobGraphEntry.isExecutable); + assertEquals(entry.isZipped, jobGraphEntry.isZipped); + assertEquals(entry.filePath, jobGraphEntry.filePath); + } + } }
