[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files This closes #6203.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a25cd3fe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a25cd3fe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a25cd3fe Branch: refs/heads/master Commit: a25cd3feddd19e75456db32a704ee5509e85dd47 Parents: 544bfb9 Author: zentol <ches...@apache.org> Authored: Mon Jun 11 11:45:12 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Tue Jul 3 21:08:45 2018 +0200 ---------------------------------------------------------------------- docs/_includes/generated/rest_dispatcher.html | 249 +++++++++---------- .../client/program/rest/RestClusterClient.java | 122 ++++----- .../program/rest/RestClusterClientTest.java | 23 -- .../webmonitor/handlers/JarRunHandler.java | 2 +- .../webmonitor/handlers/JarUploadHandler.java | 5 +- .../handlers/JarUploadHandlerTest.java | 2 +- .../flink/runtime/client/ClientUtils.java | 57 +++-- .../apache/flink/runtime/client/JobClient.java | 2 +- .../client/JobSubmissionClientActor.java | 2 +- .../dispatcher/DispatcherRestEndpoint.java | 11 +- .../flink/runtime/minicluster/MiniCluster.java | 2 +- .../flink/runtime/rest/AbstractHandler.java | 140 +++++------ .../rest/handler/AbstractRestHandler.java | 6 +- .../flink/runtime/rest/handler/FileUploads.java | 9 +- .../runtime/rest/handler/HandlerRequest.java | 8 +- .../rest/handler/job/BlobServerPortHandler.java | 66 ----- .../rest/handler/job/JobSubmitHandler.java | 136 +++++++++- .../AbstractTaskManagerFileHandler.java | 4 +- .../rest/messages/BlobServerPortHeaders.java | 74 ------ .../messages/BlobServerPortResponseBody.java | 57 ----- .../rest/messages/job/JobSubmitHeaders.java | 11 +- .../rest/messages/job/JobSubmitRequestBody.java | 115 ++++++--- .../flink/runtime/rest/util/RestConstants.java | 6 +- .../flink/runtime/client/ClientUtilsTest.java | 4 +- .../flink/runtime/rest/AbstractHandlerTest.java | 19 +- .../runtime/rest/MultipartUploadResource.java | 7 +- .../runtime/rest/RestServerEndpointITCase.java | 3 +- .../runtime/rest/handler/FileUploadsTest.java | 6 +- .../handler/job/BlobServerPortHandlerTest.java | 101 -------- .../rest/handler/job/JobSubmitHandlerTest.java | 188 ++++++++++++-- .../messages/BlobServerPortResponseTest.java | 35 --- .../rest/messages/JobSubmitRequestBodyTest.java | 9 +- .../webmonitor/TestingDispatcherGateway.java | 203 +++++++++++++++ .../webmonitor/TestingRestfulGateway.java | 30 +-- 34 files changed, 951 insertions(+), 763 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/docs/_includes/generated/rest_dispatcher.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html index c74da9c..6ed59be 100644 --- a/docs/_includes/generated/rest_dispatcher.html +++ b/docs/_includes/generated/rest_dispatcher.html @@ -1,50 +1,6 @@ <table class="table table-bordered"> <tbody> <tr> - <td class="text-left" colspan="2"><strong>/blobserver/port</strong></td> - </tr> - <tr> - <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> - <td class="text-left">Response code: <code>200 OK</code></td> - </tr> - <tr> - <td colspan="2">Returns the port of blob server which can be used to upload jars.</td> - </tr> - <tr> - <td colspan="2"> - <button data-toggle="collapse" data-target="#607508253">Request</button> - <div id="607508253" class="collapse"> - <pre> - <code> -{} </code> - </pre> - </div> - </td> - </tr> - <tr> - <td colspan="2"> - <button data-toggle="collapse" data-target="#1913718109">Response</button> - <div id="1913718109" class="collapse"> - <pre> - <code> -{ - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody", - "properties" : { - "port" : { - "type" : "integer" - } - } -} </code> - </pre> - </div> - </td> - </tr> - </tbody> -</table> -<table class="table table-bordered"> - <tbody> - <tr> <td class="text-left" colspan="2"><strong>/cluster</strong></td> </tr> <tr> @@ -226,19 +182,11 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa </tr> <tr> <td colspan="2"> - <button data-toggle="collapse" data-target="#919877128">Request</button> - <div id="919877128" class="collapse"> + <button data-toggle="collapse" data-target="#-1290030289">Request</button> + <div id="-1290030289" class="collapse"> <pre> <code> -{ - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:FileUpload", - "properties" : { - "path" : { - "type" : "string" - } - } -} </code> +{} </code> </pre> </div> </td> @@ -607,7 +555,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa <td class="text-left">Response code: <code>202 Accepted</code></td> </tr> <tr> - <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client.</td> + <td colspan="2">Submits a job. This call is primarily intended to be used by the Flink client. This call expects amultipart/form-data request that consists of file uploads for the serialized JobGraph, jars anddistributed cache artifacts and an attribute named "request"for the JSON payload.</td> </tr> <tr> <td colspan="2"> @@ -619,10 +567,28 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody", "properties" : { - "serializedJobGraph" : { + "jobGraphFileName" : { + "type" : "string" + }, + "jobJarFileNames" : { "type" : "array", "items" : { - "type" : "integer" + "type" : "string" + } + }, + "jobArtifactFileNames" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody:DistributedCacheFile", + "properties" : { + "entryName" : { + "type" : "string" + }, + "fileName" : { + "type" : "string" + } + } } } } @@ -2461,79 +2427,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa <table class="table table-bordered"> <tbody> <tr> - <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td> - </tr> - <tr> - <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> - <td class="text-left">Response code: <code>200 OK</code></td> - </tr> - <tr> - <td colspan="2">Returns user-defined accumulators of a task, aggregated across all subtasks.</td> - </tr> - <tr> - <td colspan="2">Path parameters</td> - </tr> - <tr> - <td colspan="2"> - <ul> -<li><code>jobid</code> - description</li> -<li><code>vertexid</code> - description</li> - </ul> - </td> - </tr> - <tr> - <td colspan="2"> - <button data-toggle="collapse" data-target="#485581006">Request</button> - <div id="485581006" class="collapse"> - <pre> - <code> -{} </code> - </pre> - </div> - </td> - </tr> - <tr> - <td colspan="2"> - <button data-toggle="collapse" data-target="#-1070353054">Response</button> - <div id="-1070353054" class="collapse"> - <pre> - <code> -{ - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo", - "properties" : { - "id" : { - "type" : "string" - }, - "user-accumulators" : { - "type" : "array", - "items" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator", - "properties" : { - "name" : { - "type" : "string" - }, - "type" : { - "type" : "string" - }, - "value" : { - "type" : "string" - } - } - } - } - } -} </code> - </pre> - </div> - </td> - </tr> - </tbody> -</table> -<table class="table table-bordered"> - <tbody> - <tr> <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td> </tr> <tr> @@ -2675,6 +2568,100 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=#pa <table class="table table-bordered"> <tbody> <tr> + <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/accumulators</strong></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Returns all user-defined accumulators for all subtasks of a task.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - description</li> +<li><code>vertexid</code> - description</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <button data-toggle="collapse" data-target="#-886388859">Request</button> + <div id="-886388859" class="collapse"> + <pre> + <code> +{} </code> + </pre> + </div> + </td> + </tr> + <tr> + <td colspan="2"> + <button data-toggle="collapse" data-target="#112317594">Response</button> + <div id="112317594" class="collapse"> + <pre> + <code> +{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo", + "properties" : { + "id" : { + "type" : "any" + }, + "parallelism" : { + "type" : "integer" + }, + "subtasks" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo", + "properties" : { + "subtask" : { + "type" : "integer" + }, + "attempt" : { + "type" : "integer" + }, + "host" : { + "type" : "string" + }, + "user-accumulators" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator", + "properties" : { + "name" : { + "type" : "string" + }, + "type" : { + "type" : "string" + }, + "value" : { + "type" : "string" + } + } + } + } + } + } + } + } +} </code> + </pre> + </div> + </td> + </tr> + </tbody> +</table> +<table class="table table-bordered"> + <tbody> + <tr> <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 85699d7..935a07f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -22,15 +22,16 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.NewClusterClient; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy; import org.apache.flink.client.program.rest.retry.WaitStrategy; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobClient; -import org.apache.flink.runtime.client.ClientUtils; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.FileUpload; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; @@ -49,8 +51,6 @@ import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeader import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters; -import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; -import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; @@ -87,10 +87,10 @@ import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource; import org.apache.flink.runtime.rest.messages.queue.QueueStatus; import org.apache.flink.runtime.rest.util.RestClientException; +import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.util.ScalaUtils; import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; @@ -102,20 +102,20 @@ import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import akka.actor.AddressFromURIString; - import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; -import java.net.InetSocketAddress; +import java.io.ObjectOutputStream; import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; @@ -315,36 +315,61 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster // we have to enable queued scheduling because slot will be allocated lazily jobGraph.setAllowQueuedScheduling(true); - log.info("Requesting blob server port."); - CompletableFuture<BlobServerPortResponseBody> portFuture = sendRequest(BlobServerPortHeaders.getInstance()); + CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { + try { + final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); + try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) { + objectOut.writeObject(jobGraph); + } + return jobGraphFile; + } catch (IOException e) { + throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e)); + } + }, executorService); - CompletableFuture<JobGraph> jobUploadFuture = portFuture.thenCombine( - getDispatcherAddress(), - (BlobServerPortResponseBody response, String dispatcherAddress) -> { - final int blobServerPort = response.port; - final InetSocketAddress address = new InetSocketAddress(dispatcherAddress, blobServerPort); + CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> { + List<String> jarFileNames = new ArrayList<>(8); + List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8); + Collection<FileUpload> filesToUpload = new ArrayList<>(8); - try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, flinkConfig)); - } catch (Exception e) { - throw new CompletionException(e); - } + filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY)); - return jobGraph; - }); + for (Path jar : jobGraph.getUserJars()) { + jarFileNames.add(jar.getName()); + filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR)); + } - CompletableFuture<JobSubmitResponseBody> submissionFuture = jobUploadFuture.thenCompose( - (JobGraph jobGraphToSubmit) -> { - log.info("Submitting job graph."); + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) { + artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName())); + filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY)); + } - try { - return sendRequest( - JobSubmitHeaders.getInstance(), - new JobSubmitRequestBody(jobGraph)); - } catch (IOException ioe) { - throw new CompletionException(new FlinkException("Could not create JobSubmitRequestBody.", ioe)); - } - }); + final JobSubmitRequestBody requestBody = new JobSubmitRequestBody( + jobGraphFile.getFileName().toString(), + jarFileNames, + artifactFileNames); + + return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload)); + }); + + final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose( + requestAndFileUploads -> sendRetriableRequest( + JobSubmitHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + requestAndFileUploads.f0, + requestAndFileUploads.f1, + isConnectionProblemOrServiceUnavailable()) + ); + + submissionFuture + .thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile) + .thenAccept(jobGraphFile -> { + try { + Files.delete(jobGraphFile); + } catch (IOException e) { + log.warn("Could not delete temporary file {}.", jobGraphFile, e); + } + }); return submissionFuture .thenApply( @@ -676,9 +701,14 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRetriableRequest(M messageHeaders, U messageParameters, R request, Predicate<Throwable> retryPredicate) { + return sendRetriableRequest(messageHeaders, messageParameters, request, Collections.emptyList(), retryPredicate); + } + + private <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> + sendRetriableRequest(M messageHeaders, U messageParameters, R request, Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) { return retry(() -> getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> { try { - return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request); + return restClient.sendRequest(webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, filesToUpload); } catch (IOException e) { throw new CompletionException(e); } @@ -736,26 +766,4 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster } }, executorService); } - - private CompletableFuture<String> getDispatcherAddress() { - return FutureUtils.orTimeout( - dispatcherLeaderRetriever.getLeaderFuture(), - restClusterClientConfiguration.getAwaitLeaderTimeout(), - TimeUnit.MILLISECONDS) - .thenApplyAsync(leaderAddressSessionId -> { - final String address = leaderAddressSessionId.f0; - final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host()); - - return host.orElseGet(() -> { - // if the dispatcher address does not contain a host part, then assume it's running - // on the same machine as the client - log.info("The dispatcher seems to run without remoting enabled. This indicates that we are " + - "in a test. This can only work if the RestClusterClient runs on the same machine. " + - "Assuming, therefore, 'localhost' as the host."); - - return "localhost"; - }); - }, executorService); - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index f025d67..75f16c0 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -53,8 +53,6 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter; -import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; -import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; @@ -226,7 +224,6 @@ public class RestClusterClientTest extends TestLogger { @Test public void testJobSubmitCancelStop() throws Exception { - TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler(); TestJobExecutionResultHandler testJobExecutionResultHandler = @@ -237,15 +234,12 @@ public class RestClusterClientTest extends TestLogger { .build())); try (TestRestServerEndpoint ignored = createRestServerEndpoint( - portHandler, submitHandler, terminationHandler, testJobExecutionResultHandler)) { - Assert.assertFalse(portHandler.portRetrieved); Assert.assertFalse(submitHandler.jobSubmitted); restClusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); - Assert.assertTrue(portHandler.portRetrieved); Assert.assertTrue(submitHandler.jobSubmitted); Assert.assertFalse(terminationHandler.jobCanceled); @@ -264,11 +258,9 @@ public class RestClusterClientTest extends TestLogger { @Test public void testDetachedJobSubmission() throws Exception { - final TestBlobServerPortHandler testBlobServerPortHandler = new TestBlobServerPortHandler(); final TestJobSubmitHandler testJobSubmitHandler = new TestJobSubmitHandler(); try (TestRestServerEndpoint ignored = createRestServerEndpoint( - testBlobServerPortHandler, testJobSubmitHandler)) { restClusterClient.setDetached(true); @@ -282,20 +274,6 @@ public class RestClusterClientTest extends TestLogger { } - private class TestBlobServerPortHandler extends TestHandler<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> { - private volatile boolean portRetrieved = false; - - private TestBlobServerPortHandler() { - super(BlobServerPortHeaders.getInstance()); - } - - @Override - protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - portRetrieved = true; - return CompletableFuture.completedFuture(new BlobServerPortResponseBody(12000)); - } - } - private class TestJobSubmitHandler extends TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> { private volatile boolean jobSubmitted = false; @@ -390,7 +368,6 @@ public class RestClusterClientTest extends TestLogger { try (TestRestServerEndpoint ignored = createRestServerEndpoint( testJobExecutionResultHandler, - new TestBlobServerPortHandler(), new TestJobSubmitHandler())) { JobExecutionResult jobExecutionResult; http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 10387c8..1e620d4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -114,7 +114,7 @@ public class JarRunHandler extends CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(getDispatcherHost(gateway), blobServerPort); try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration)); + ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration)); } catch (FlinkException e) { throw new CompletionException(e); } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index a1ef82b..83db224 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -32,6 +32,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import javax.annotation.Nonnull; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -70,11 +71,11 @@ public class JarUploadHandler extends protected CompletableFuture<JarUploadResponseBody> handleRequest( @Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull final RestfulGateway gateway) throws RestHandlerException { - Collection<Path> uploadedFiles = request.getUploadedFiles(); + Collection<File> uploadedFiles = request.getUploadedFiles(); if (uploadedFiles.size() != 1) { throw new RestHandlerException("Exactly 1 file must be sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST); } - final Path fileUpload = uploadedFiles.iterator().next(); + final Path fileUpload = uploadedFiles.iterator().next().toPath(); return CompletableFuture.supplyAsync(() -> { if (!fileUpload.getFileName().toString().endsWith(".jar")) { throw new CompletionException(new RestHandlerException( http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java index 812d4c6..c9e25ed 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java @@ -131,6 +131,6 @@ public class JarUploadHandlerTest extends TestLogger { EmptyMessageParameters.getInstance(), Collections.emptyMap(), Collections.emptyMap(), - Collections.singleton(uploadedFile)); + Collections.singleton(uploadedFile.toFile())); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java index fc6a621..06baaaf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.client; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; @@ -32,7 +31,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -43,20 +41,41 @@ public enum ClientUtils { ; /** - * Uploads all files required for the execution of the given {@link JobGraph} using the {@link BlobClient} from - * the given {@link Supplier}. + * Extracts all files required for the execution from the given {@link JobGraph} and uploads them using the {@link BlobClient} + * from the given {@link Supplier}. * * @param jobGraph jobgraph requiring files * @param clientSupplier supplier of blob client to upload files with - * @throws IOException if the upload fails + * @throws FlinkException if the upload fails */ - public static void uploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException { + public static void extractAndUploadJobGraphFiles(JobGraph jobGraph, SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException { List<Path> userJars = jobGraph.getUserJars(); - Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = jobGraph.getUserArtifacts(); + Collection<Tuple2<String, Path>> userArtifacts = jobGraph.getUserArtifacts().entrySet().stream() + .map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath))) + .collect(Collectors.toList()); + + uploadJobGraphFiles(jobGraph, userJars, userArtifacts, clientSupplier); + } + + /** + * Uploads the given jars and artifacts required for the execution of the given {@link JobGraph} using the {@link BlobClient} from + * the given {@link Supplier}. + * + * @param jobGraph jobgraph requiring files + * @param userJars jars to upload + * @param userArtifacts artifacts to upload + * @param clientSupplier supplier of blob client to upload files with + * @throws FlinkException if the upload fails + */ + public static void uploadJobGraphFiles( + JobGraph jobGraph, + Collection<Path> userJars, + Collection<Tuple2<String, org.apache.flink.core.fs.Path>> userArtifacts, + SupplierWithException<BlobClient, IOException> clientSupplier) throws FlinkException { if (!userJars.isEmpty() || !userArtifacts.isEmpty()) { try (BlobClient client = clientSupplier.get()) { - uploadAndSetUserJars(jobGraph, client); - uploadAndSetUserArtifacts(jobGraph, client); + uploadAndSetUserJars(jobGraph, userJars, client); + uploadAndSetUserArtifacts(jobGraph, userArtifacts, client); } catch (IOException ioe) { throw new FlinkException("Could not upload job files.", ioe); } @@ -64,15 +83,15 @@ public enum ClientUtils { } /** - * Uploads the user jars from the given {@link JobGraph} using the given {@link BlobClient}, - * and sets the appropriate blobkeys. + * Uploads the given user jars using the given {@link BlobClient}, and sets the appropriate blobkeys on the given {@link JobGraph}. * - * @param jobGraph jobgraph requiring user jars + * @param jobGraph jobgraph requiring user jars + * @param userJars jars to upload * @param blobClient client to upload jars with * @throws IOException if the upload fails */ - private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient blobClient) throws IOException { - Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient); + private static void uploadAndSetUserJars(JobGraph jobGraph, Collection<Path> userJars, BlobClient blobClient) throws IOException { + Collection<PermanentBlobKey> blobKeys = uploadUserJars(jobGraph.getJobID(), userJars, blobClient); setUserJarBlobKeys(blobKeys, jobGraph); } @@ -90,18 +109,14 @@ public enum ClientUtils { } /** - * Uploads the user artifacts from the given {@link JobGraph} using the given {@link BlobClient}, - * and sets the appropriate blobkeys. + * Uploads the given user artifacts using the given {@link BlobClient}, and sets the appropriate blobkeys on the given {@link JobGraph}. * * @param jobGraph jobgraph requiring user artifacts + * @param artifactPaths artifacts to upload * @param blobClient client to upload artifacts with * @throws IOException if the upload fails */ - private static void uploadAndSetUserArtifacts(JobGraph jobGraph, BlobClient blobClient) throws IOException { - Collection<Tuple2<String, Path>> artifactPaths = jobGraph.getUserArtifacts().entrySet().stream() - .map(entry -> Tuple2.of(entry.getKey(), new Path(entry.getValue().filePath))) - .collect(Collectors.toList()); - + private static void uploadAndSetUserArtifacts(JobGraph jobGraph, Collection<Tuple2<String, Path>> artifactPaths, BlobClient blobClient) throws IOException { Collection<Tuple2<String, PermanentBlobKey>> blobKeys = uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient); setUserArtifactBlobKeys(jobGraph, blobKeys); } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index 27da3b8..635def2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -424,7 +424,7 @@ public class JobClient { } try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config)); + ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, config)); } catch (FlinkException e) { throw new JobSubmissionException(jobGraph.getJobID(), "Could not upload job files.", e); http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java index 2783b09..89978fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java @@ -155,7 +155,7 @@ public class JobSubmissionClientActor extends JobClientActor { final CompletableFuture<Void> jarUploadFuture = blobServerAddressFuture.thenAcceptAsync( (InetSocketAddress blobServerAddress) -> { try { - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig)); + ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(blobServerAddress, clientConfig)); } catch (FlinkException e) { throw new CompletionException(e); } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 8072cf4..4279330 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; -import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint; @@ -88,17 +87,12 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway final Time timeout = restConfiguration.getTimeout(); - BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler( - restAddressFuture, - leaderRetriever, - timeout, - responseHeaders); - JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( restAddressFuture, leaderRetriever, timeout, - responseHeaders); + responseHeaders, + executor); if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) { try { @@ -125,7 +119,6 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway log.info("Web-based job submission is not enabled."); } - handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); return handlers; http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 4fab2b8..97ab5a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -676,7 +676,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { private CompletableFuture<Void> uploadAndSetJobFiles(final CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph job) { return blobServerAddressFuture.thenAccept(blobServerAddress -> { try { - ClientUtils.uploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration())); + ClientUtils.extractAndUploadJobGraphFiles(job, () -> new BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration())); } catch (FlinkException e) { throw new CompletionException(e); } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java index 1e88425..0d8605a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java @@ -49,7 +49,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -82,98 +84,84 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques } @Override - protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) throws Exception { + protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRequest, T gateway) { HttpRequest httpRequest = routedRequest.getRequest(); if (log.isTraceEnabled()) { log.trace("Received request " + httpRequest.uri() + '.'); } + FileUploads uploadedFiles = null; try { if (!(httpRequest instanceof FullHttpRequest)) { // The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns // FullHttpRequests. log.error("Implementation error: Received a request that wasn't a FullHttpRequest."); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST); } final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx); - if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("File uploads not allowed."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } + if (!untypedResponseMessageHeaders.acceptsFileUploads() && !uploadedFiles.getUploadedFiles().isEmpty()) { + throw new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST); + } - R request; - if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - } else { - try { - ByteBufInputStream in = new ByteBufInputStream(msgContent); - request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Failed to read request.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody(String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName())), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + throw new RestHandlerException("Bad request received.", HttpResponseStatus.BAD_REQUEST, je); } - - final HandlerRequest<R, M> handlerRequest; - + } else { try { - handlerRequest = new HandlerRequest<R, M>( - request, - untypedResponseMessageHeaders.getUnresolvedMessageParameters(), - routedRequest.getRouteResult().pathParams(), - routedRequest.getRouteResult().queryParams(), - uploadedFiles.getUploadedFiles()); - } catch (HandlerRequestException hre) { - log.error("Could not create the handler request.", hre); - - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())), + ByteBufInputStream in = new ByteBufInputStream(msgContent); + request = MAPPER.readValue(in, untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Failed to read request.", je); + throw new RestHandlerException( + String.format("Request did not match expected format %s.", untypedResponseMessageHeaders.getRequestClass().getSimpleName()), HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + je); } + } - respondToRequest( - ctx, - httpRequest, - handlerRequest, - gateway); + final HandlerRequest<R, M> handlerRequest; + + try { + handlerRequest = new HandlerRequest<R, M>( + request, + untypedResponseMessageHeaders.getUnresolvedMessageParameters(), + routedRequest.getRouteResult().pathParams(), + routedRequest.getRouteResult().queryParams(), + uploadedFiles.getUploadedFiles()); + } catch (HandlerRequestException hre) { + log.error("Could not create the handler request.", hre); + throw new RestHandlerException( + String.format("Bad request, could not parse parameters: %s", hre.getMessage()), + HttpResponseStatus.BAD_REQUEST, + hre); } + CompletableFuture<Void> requestProcessingFuture = respondToRequest( + ctx, + httpRequest, + handlerRequest, + gateway); + + final FileUploads finalUploadedFiles = uploadedFiles; + requestProcessingFuture + .whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles)); + } catch (RestHandlerException rhe) { + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody(rhe.getMessage()), + rhe.getHttpResponseStatus(), + responseHeaders); + cleanupFileUploads(uploadedFiles); } catch (Throwable e) { log.error("Request processing failed.", e); HandlerUtils.sendErrorResponse( @@ -182,6 +170,17 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR, responseHeaders); + cleanupFileUploads(uploadedFiles); + } + } + + private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) { + if (uploadedFiles != null) { + try { + uploadedFiles.close(); + } catch (IOException e) { + log.warn("Could not cleanup uploaded files.", e); + } } } @@ -192,9 +191,10 @@ public abstract class AbstractHandler<T extends RestfulGateway, R extends Reques * @param httpRequest original http request * @param handlerRequest typed handler request * @param gateway leader gateway + * @return Future which is completed once the request has been processed * @throws RestHandlerException if an exception occurred while responding */ - protected abstract void respondToRequest( + protected abstract CompletableFuture<Void> respondToRequest( ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index 448711b..e4cec08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re } @Override - protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) { + protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) { CompletableFuture<P> response; try { @@ -79,7 +79,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re response = FutureUtils.completedExceptionally(e); } - response.whenComplete((P resp, Throwable throwable) -> { + return response.whenComplete((P resp, Throwable throwable) -> { if (throwable != null) { Throwable error = ExceptionUtils.stripCompletionException(throwable); @@ -105,7 +105,7 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re messageHeaders.getResponseStatusCode(), responseHeaders); } - }); + }).thenApply(ignored -> null); } private void processRestHandlerException(ChannelHandlerContext ctx, HttpRequest httpRequest, RestHandlerException rhe) { http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java index 31ac47bb..b233cb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java @@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -58,7 +59,7 @@ public final class FileUploads implements AutoCloseable { this.uploadDirectory = uploadDirectory; } - public Collection<Path> getUploadedFiles() throws IOException { + public Collection<File> getUploadedFiles() throws IOException { if (uploadDirectory == null) { return Collections.emptyList(); } @@ -78,9 +79,9 @@ public final class FileUploads implements AutoCloseable { private static final class FileAdderVisitor extends SimpleFileVisitor<Path> { - private final Collection<Path> files = new ArrayList<>(4); + private final Collection<File> files = new ArrayList<>(4); - Collection<Path> getContainedFiles() { + Collection<File> getContainedFiles() { return files; } @@ -90,7 +91,7 @@ public final class FileUploads implements AutoCloseable { @Override public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { FileVisitResult result = super.visitFile(file, attrs); - files.add(file); + files.add(file.toFile()); return result; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java index 7e93556..990dae5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java @@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; -import java.nio.file.Path; +import java.io.File; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -43,7 +43,7 @@ import java.util.StringJoiner; public class HandlerRequest<R extends RequestBody, M extends MessageParameters> { private final R requestBody; - private final Collection<Path> uploadedFiles; + private final Collection<File> uploadedFiles; private final Map<Class<? extends MessagePathParameter<?>>, MessagePathParameter<?>> pathParameters = new HashMap<>(2); private final Map<Class<? extends MessageQueryParameter<?>>, MessageQueryParameter<?>> queryParameters = new HashMap<>(2); @@ -55,7 +55,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters> this(requestBody, messageParameters, receivedPathParameters, receivedQueryParameters, Collections.emptyList()); } - public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<Path> uploadedFiles) throws HandlerRequestException { + public HandlerRequest(R requestBody, M messageParameters, Map<String, String> receivedPathParameters, Map<String, List<String>> receivedQueryParameters, Collection<File> uploadedFiles) throws HandlerRequestException { this.requestBody = Preconditions.checkNotNull(requestBody); this.uploadedFiles = Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles)); Preconditions.checkNotNull(messageParameters); @@ -141,7 +141,7 @@ public class HandlerRequest<R extends RequestBody, M extends MessageParameters> } @Nonnull - public Collection<Path> getUploadedFiles() { + public Collection<File> getUploadedFiles() { return uploadedFiles; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java deleted file mode 100644 index 4b5fa89..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.job; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.dispatcher.DispatcherGateway; -import org.apache.flink.runtime.rest.handler.AbstractRestHandler; -import org.apache.flink.runtime.rest.handler.HandlerRequest; -import org.apache.flink.runtime.rest.handler.RestHandlerException; -import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; -import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; -import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; -import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.ExceptionUtils; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -import javax.annotation.Nonnull; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; - -/** - * This handler can be used to retrieve the port that the blob server runs on. - */ -public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> { - - public BlobServerPortHandler( - CompletableFuture<String> localRestAddress, - GatewayRetriever<? extends DispatcherGateway> leaderRetriever, - Time timeout, - Map<String, String> headers) { - super(localRestAddress, leaderRetriever, timeout, headers, BlobServerPortHeaders.getInstance()); - } - - @Override - protected CompletableFuture<BlobServerPortResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - return gateway - .getBlobServerPort(timeout) - .thenApply(BlobServerPortResponseBody::new) - .exceptionally(error -> { - throw new CompletionException(new RestHandlerException( - "Failed to retrieve blob server port.", - HttpResponseStatus.INTERNAL_SERVER_ERROR, - ExceptionUtils.stripCompletionException(error))); - }); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java index af04629..052b056 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java @@ -19,8 +19,14 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.client.ClientUtils; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -29,47 +35,153 @@ import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.FlinkException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import javax.annotation.Nonnull; -import java.io.ByteArrayInputStream; +import java.io.File; import java.io.ObjectInputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; /** * This handler can be used to submit jobs to a Flink cluster. */ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> { + private static final String FILE_TYPE_JOB_GRAPH = "JobGraph"; + private static final String FILE_TYPE_JAR = "Jar"; + private static final String FILE_TYPE_ARTIFACT = "Artifact"; + + private final Executor executor; + public JobSubmitHandler( CompletableFuture<String> localRestAddress, GatewayRetriever<? extends DispatcherGateway> leaderRetriever, Time timeout, - Map<String, String> headers) { + Map<String, String> headers, + Executor executor) { super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); + this.executor = executor; } @Override protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - JobGraph jobGraph; - try { - ObjectInputStream objectIn = new ObjectInputStream(new ByteArrayInputStream(request.getRequestBody().serializedJobGraph)); - jobGraph = (JobGraph) objectIn.readObject(); - } catch (Exception e) { + final Collection<File> uploadedFiles = request.getUploadedFiles(); + final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap( + File::getName, + Path::fromLocalFile + )); + + if (uploadedFiles.size() != nameToFile.size()) { throw new RestHandlerException( - "Failed to deserialize JobGraph.", - HttpResponseStatus.BAD_REQUEST, - e); + String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s", + uploadedFiles.size() < nameToFile.size() ? "lower" : "higher", + nameToFile.size(), + uploadedFiles.size()), + HttpResponseStatus.BAD_REQUEST + ); } - return gateway.submitJob(jobGraph, timeout) - .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())) + final JobSubmitRequestBody requestBody = request.getRequestBody(); + + CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile); + + Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile); + + Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile); + + CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts); + + CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout)); + + return jobSubmissionFuture.thenCombine(jobGraphFuture, + (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID())) .exceptionally(exception -> { throw new CompletionException(new RestHandlerException("Job submission failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, exception)); }); } + + private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody requestBody, Map<String, Path> nameToFile) throws MissingFileException { + final Path jobGraphFile = getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, nameToFile); + + return CompletableFuture.supplyAsync(() -> { + JobGraph jobGraph; + try (ObjectInputStream objectIn = new ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))) { + jobGraph = (JobGraph) objectIn.readObject(); + } catch (Exception e) { + throw new CompletionException(new RestHandlerException( + "Failed to deserialize JobGraph.", + HttpResponseStatus.BAD_REQUEST, + e)); + } + return jobGraph; + }, executor); + } + + private static Collection<Path> getJarFilesToUpload(Collection<String> jarFileNames, Map<String, Path> nameToFileMap) throws MissingFileException { + Collection<Path> jarFiles = new ArrayList<>(jarFileNames.size()); + for (String jarFileName : jarFileNames) { + Path jarFile = getPathAndAssertUpload(jarFileName, FILE_TYPE_JAR, nameToFileMap); + jarFiles.add(new Path(jarFile.toString())); + } + return jarFiles; + } + + private static Collection<Tuple2<String, Path>> getArtifactFilesToUpload( + Collection<JobSubmitRequestBody.DistributedCacheFile> artifactEntries, + Map<String, Path> nameToFileMap) throws MissingFileException { + Collection<Tuple2<String, Path>> artifacts = new ArrayList<>(artifactEntries.size()); + for (JobSubmitRequestBody.DistributedCacheFile artifactFileName : artifactEntries) { + Path artifactFile = getPathAndAssertUpload(artifactFileName.fileName, FILE_TYPE_ARTIFACT, nameToFileMap); + artifacts.add(Tuple2.of(artifactFileName.entryName, new Path(artifactFile.toString()))); + } + return artifacts; + } + + private CompletableFuture<JobGraph> uploadJobGraphFiles( + DispatcherGateway gateway, + CompletableFuture<JobGraph> jobGraphFuture, + Collection<Path> jarFiles, + Collection<Tuple2<String, Path>> artifacts) { + CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout); + + return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> { + final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort); + try { + ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, new Configuration())); + } catch (FlinkException e) { + throw new CompletionException(new RestHandlerException( + "Could not upload job files.", + HttpResponseStatus.INTERNAL_SERVER_ERROR, + e)); + } + return jobGraph; + }); + } + + private static Path getPathAndAssertUpload(String fileName, String type, Map<String, Path> uploadedFiles) throws MissingFileException { + final Path file = uploadedFiles.get(fileName); + if (file == null) { + throw new MissingFileException(type, fileName); + } + return file; + } + + private static final class MissingFileException extends RestHandlerException { + + private static final long serialVersionUID = -7954810495610194965L; + + MissingFileException(String type, String fileName) { + super(type + " file " + fileName + " could not be found on the server.", HttpResponseStatus.BAD_REQUEST); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java index 83fab69..edefa15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java @@ -117,7 +117,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag } @Override - protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException { + protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) throws RestHandlerException { final ResourceID taskManagerId = handlerRequest.getPathParameter(TaskManagerIdPathParameter.class); final CompletableFuture<TransientBlobKey> blobKeyFuture; @@ -152,7 +152,7 @@ public abstract class AbstractTaskManagerFileHandler<M extends TaskManagerMessag }, ctx.executor()); - resultFuture.whenComplete( + return resultFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { log.error("Failed to transfer file from TaskExecutor {}.", taskManagerId, throwable); http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java deleted file mode 100644 index a845de3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.runtime.rest.HttpMethodWrapper; - -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - -/** - * These headers define the protocol for querying the port of the blob server. - */ -public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> { - - private static final String URL = "/blobserver/port"; - private static final BlobServerPortHeaders INSTANCE = new BlobServerPortHeaders(); - - private BlobServerPortHeaders() { - } - - @Override - public Class<EmptyRequestBody> getRequestClass() { - return EmptyRequestBody.class; - } - - @Override - public HttpMethodWrapper getHttpMethod() { - return HttpMethodWrapper.GET; - } - - @Override - public String getTargetRestEndpointURL() { - return URL; - } - - @Override - public Class<BlobServerPortResponseBody> getResponseClass() { - return BlobServerPortResponseBody.class; - } - - @Override - public HttpResponseStatus getResponseStatusCode() { - return HttpResponseStatus.OK; - } - - @Override - public EmptyMessageParameters getUnresolvedMessageParameters() { - return EmptyMessageParameters.getInstance(); - } - - public static BlobServerPortHeaders getInstance() { - return INSTANCE; - } - - @Override - public String getDescription() { - return "Returns the port of blob server which can be used to upload jars."; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java deleted file mode 100644 index 895ecf3..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -/** - * Response containing the blob server port. - */ -public final class BlobServerPortResponseBody implements ResponseBody { - - static final String FIELD_NAME_PORT = "port"; - - /** - * The port of the blob server. - */ - @JsonProperty(FIELD_NAME_PORT) - public final int port; - - @JsonCreator - public BlobServerPortResponseBody( - @JsonProperty(FIELD_NAME_PORT) int port) { - - this.port = port; - } - - @Override - public int hashCode() { - return 67 * port; - } - - @Override - public boolean equals(Object object) { - if (object instanceof BlobServerPortResponseBody) { - BlobServerPortResponseBody other = (BlobServerPortResponseBody) object; - return this.port == other.port; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java index 88f53f2..42f64b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.messages.job; +import org.apache.flink.runtime.rest.FileUploadHandler; import org.apache.flink.runtime.rest.HttpMethodWrapper; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -71,6 +72,14 @@ public class JobSubmitHeaders implements MessageHeaders<JobSubmitRequestBody, Jo @Override public String getDescription() { - return "Submits a job. This call is primarily intended to be used by the Flink client."; + return "Submits a job. This call is primarily intended to be used by the Flink client. This call expects a" + + "multipart/form-data request that consists of file uploads for the serialized JobGraph, jars and" + + "distributed cache artifacts and an attribute named \"" + FileUploadHandler.HTTP_ATTRIBUTE_REQUEST + "\"for " + + "the JSON payload."; + } + + @Override + public boolean acceptsFileUploads() { + return true; } } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java index 3f550f0..0ca82b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java @@ -18,64 +18,119 @@ package org.apache.flink.runtime.rest.messages.job; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; /** * Request for submitting a job. * - * <p>We currently require the job-jars to be uploaded through the blob-server. + * <p>This request only contains the names of files that must be present on the server, and defines how these files are + * interpreted. */ public final class JobSubmitRequestBody implements RequestBody { - private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName"; + private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames"; + private static final String FIELD_NAME_JOB_ARTIFACTS = "jobArtifactFileNames"; - /** - * The serialized job graph. - */ - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) - public final byte[] serializedJobGraph; + @JsonProperty(FIELD_NAME_JOB_GRAPH) + public final String jobGraphFileName; - public JobSubmitRequestBody(JobGraph jobGraph) throws IOException { - this(serializeJobGraph(jobGraph)); - } + @JsonProperty(FIELD_NAME_JOB_JARS) + public final Collection<String> jarFileNames; + + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) + public final Collection<DistributedCacheFile> artifactFileNames; @JsonCreator public JobSubmitRequestBody( - @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] serializedJobGraph) { - this.serializedJobGraph = Preconditions.checkNotNull(serializedJobGraph); + @JsonProperty(FIELD_NAME_JOB_GRAPH) String jobGraphFileName, + @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> jarFileNames, + @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) Collection<DistributedCacheFile> artifactFileNames) { + this.jobGraphFileName = jobGraphFileName; + this.jarFileNames = jarFileNames; + this.artifactFileNames = artifactFileNames; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobSubmitRequestBody that = (JobSubmitRequestBody) o; + return Objects.equals(jobGraphFileName, that.jobGraphFileName) && + Objects.equals(jarFileNames, that.jarFileNames) && + Objects.equals(artifactFileNames, that.artifactFileNames); } @Override public int hashCode() { - return 71 * Arrays.hashCode(this.serializedJobGraph); + return Objects.hash(jobGraphFileName, jarFileNames, artifactFileNames); } @Override - public boolean equals(Object object) { - if (object instanceof JobSubmitRequestBody) { - JobSubmitRequestBody other = (JobSubmitRequestBody) object; - return Arrays.equals(this.serializedJobGraph, other.serializedJobGraph); - } - return false; + public String toString() { + return "JobSubmitRequestBody{" + + "jobGraphFileName='" + jobGraphFileName + '\'' + + ", jarFileNames=" + jarFileNames + + ", artifactFileNames=" + artifactFileNames + + '}'; } - private static byte[] serializeJobGraph(JobGraph jobGraph) throws IOException { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 * 1024)) { - ObjectOutputStream out = new ObjectOutputStream(baos); + /** + * Descriptor for a distributed cache file. + */ + public static class DistributedCacheFile { + private static final String FIELD_NAME_ENTRY_NAME = "entryName"; + private static final String FIELD_NAME_FILE_NAME = "fileName"; + + @JsonProperty(FIELD_NAME_ENTRY_NAME) + public final String entryName; + + @JsonProperty(FIELD_NAME_FILE_NAME) + public final String fileName; + + @JsonCreator + public DistributedCacheFile( + @JsonProperty(FIELD_NAME_ENTRY_NAME) String entryName, + @JsonProperty(FIELD_NAME_FILE_NAME) String fileName) { + this.entryName = entryName; + this.fileName = fileName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DistributedCacheFile that = (DistributedCacheFile) o; + return Objects.equals(entryName, that.entryName) && + Objects.equals(fileName, that.fileName); + } - out.writeObject(jobGraph); + @Override + public int hashCode() { + + return Objects.hash(entryName, fileName); + } - return baos.toByteArray(); + @Override + public String toString() { + return "DistributedCacheFile{" + + "entryName='" + entryName + '\'' + + ", fileName='" + fileName + '\'' + + '}'; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java index c5135bf..a538e17 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java @@ -23,6 +23,10 @@ import org.apache.flink.configuration.ConfigConstants; /** * This class contains constants to be used by rest components. */ -public class RestConstants { +public enum RestConstants { + ; + public static final String REST_CONTENT_TYPE = "application/json; charset=" + ConfigConstants.DEFAULT_CHARSET.name(); + public static final String CONTENT_TYPE_JAR = "application/java-archive"; + public static final String CONTENT_TYPE_BINARY = "application/octet-stream"; } http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java index dc14cb1..f151f28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java @@ -85,7 +85,7 @@ public class ClientUtilsTest extends TestLogger { assertEquals(jars.size(), jobGraph.getUserJars().size()); assertEquals(0, jobGraph.getUserJarBlobKeys().size()); - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())); + ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())); assertEquals(jars.size(), jobGraph.getUserJars().size()); assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size()); @@ -124,7 +124,7 @@ public class ClientUtilsTest extends TestLogger { assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size()); assertEquals(0, jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count()); - ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())); + ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new Configuration())); assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size()); assertEquals(localArtifacts.size(), jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != null).count()); http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java index 91fba68..607c1c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java @@ -81,7 +81,8 @@ public class AbstractHandlerTest extends TestLogger { final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway); - TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + CompletableFuture<Void> requestProcessingCompleteFuture = new CompletableFuture<>(); + TestHandler handler = new TestHandler(requestProcessingCompleteFuture, CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); RouteResult<?> routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), ""); HttpRequest request = new DefaultFullHttpRequest( @@ -101,6 +102,9 @@ public class AbstractHandlerTest extends TestLogger { handler.respondAsLeader(context, routerRequest, mockRestfulGateway); + // the (asynchronous) request processing is not yet complete so the files should still exist + Assert.assertTrue(Files.exists(file)); + requestProcessingCompleteFuture.complete(null); Assert.assertFalse(Files.exists(file)); } @@ -156,14 +160,16 @@ public class AbstractHandlerTest extends TestLogger { } private static class TestHandler extends AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> { + private final CompletableFuture<Void> completionFuture; - protected TestHandler(@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) { + protected TestHandler(CompletableFuture<Void> completionFuture, @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever) { super(localAddressFuture, leaderRetriever, RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE); + this.completionFuture = completionFuture; } @Override - protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException { - + protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws RestHandlerException { + return completionFuture; } private enum TestHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> { @@ -188,6 +194,11 @@ public class AbstractHandlerTest extends TestLogger { public String getTargetRestEndpointURL() { return "/test"; } + + @Override + public boolean acceptsFileUploads() { + return true; + } } } }