[FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files

This closes #6203.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a25cd3fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a25cd3fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a25cd3fe

Branch: refs/heads/master
Commit: a25cd3feddd19e75456db32a704ee5509e85dd47
Parents: 544bfb9
Author: zentol <ches...@apache.org>
Authored: Mon Jun 11 11:45:12 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Tue Jul 3 21:08:45 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/rest_dispatcher.html   | 249 +++++++++----------
 .../client/program/rest/RestClusterClient.java  | 122 ++++-----
 .../program/rest/RestClusterClientTest.java     |  23 --
 .../webmonitor/handlers/JarRunHandler.java      |   2 +-
 .../webmonitor/handlers/JarUploadHandler.java   |   5 +-
 .../handlers/JarUploadHandlerTest.java          |   2 +-
 .../flink/runtime/client/ClientUtils.java       |  57 +++--
 .../apache/flink/runtime/client/JobClient.java  |   2 +-
 .../client/JobSubmissionClientActor.java        |   2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  11 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   2 +-
 .../flink/runtime/rest/AbstractHandler.java     | 140 +++++------
 .../rest/handler/AbstractRestHandler.java       |   6 +-
 .../flink/runtime/rest/handler/FileUploads.java |   9 +-
 .../runtime/rest/handler/HandlerRequest.java    |   8 +-
 .../rest/handler/job/BlobServerPortHandler.java |  66 -----
 .../rest/handler/job/JobSubmitHandler.java      | 136 +++++++++-
 .../AbstractTaskManagerFileHandler.java         |   4 +-
 .../rest/messages/BlobServerPortHeaders.java    |  74 ------
 .../messages/BlobServerPortResponseBody.java    |  57 -----
 .../rest/messages/job/JobSubmitHeaders.java     |  11 +-
 .../rest/messages/job/JobSubmitRequestBody.java | 115 ++++++---
 .../flink/runtime/rest/util/RestConstants.java  |   6 +-
 .../flink/runtime/client/ClientUtilsTest.java   |   4 +-
 .../flink/runtime/rest/AbstractHandlerTest.java |  19 +-
 .../runtime/rest/MultipartUploadResource.java   |   7 +-
 .../runtime/rest/RestServerEndpointITCase.java  |   3 +-
 .../runtime/rest/handler/FileUploadsTest.java   |   6 +-
 .../handler/job/BlobServerPortHandlerTest.java  | 101 --------
 .../rest/handler/job/JobSubmitHandlerTest.java  | 188 ++++++++++++--
 .../messages/BlobServerPortResponseTest.java    |  35 ---
 .../rest/messages/JobSubmitRequestBodyTest.java |   9 +-
 .../webmonitor/TestingDispatcherGateway.java    | 203 +++++++++++++++
 .../webmonitor/TestingRestfulGateway.java       |  30 +--
 34 files changed, 951 insertions(+), 763 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html 
b/docs/_includes/generated/rest_dispatcher.html
index c74da9c..6ed59be 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -1,50 +1,6 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/blobserver/port</strong></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns the port of blob server which can be used to 
upload jars.</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" 
data-target="#607508253">Request</button>
-        <div id="607508253" class="collapse">
-          <pre>
-            <code>
-{}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" 
data-target="#1913718109">Response</button>
-        <div id="1913718109" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:BlobServerPortResponseBody",
-  "properties" : {
-    "port" : {
-      "type" : "integer"
-    }
-  }
-}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
-<table class="table table-bordered">
-  <tbody>
-    <tr>
       <td class="text-left" colspan="2"><strong>/cluster</strong></td>
     </tr>
     <tr>
@@ -226,19 +182,11 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=#pa
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" 
data-target="#919877128">Request</button>
-        <div id="919877128" class="collapse">
+        <button data-toggle="collapse" 
data-target="#-1290030289">Request</button>
+        <div id="-1290030289" class="collapse">
           <pre>
             <code>
-{
-  "type" : "object",
-  "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:FileUpload",
-  "properties" : {
-    "path" : {
-      "type" : "string"
-    }
-  }
-}            </code>
+{}            </code>
           </pre>
          </div>
       </td>
@@ -607,7 +555,7 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=#pa
       <td class="text-left">Response code: <code>202 Accepted</code></td>
     </tr>
     <tr>
-      <td colspan="2">Submits a job. This call is primarily intended to be 
used by the Flink client.</td>
+      <td colspan="2">Submits a job. This call is primarily intended to be 
used by the Flink client. This call expects amultipart/form-data request that 
consists of file uploads for the serialized JobGraph, jars anddistributed cache 
artifacts and an attribute named "request"for the JSON payload.</td>
     </tr>
     <tr>
       <td colspan="2">
@@ -619,10 +567,28 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=#pa
   "type" : "object",
   "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody",
   "properties" : {
-    "serializedJobGraph" : {
+    "jobGraphFileName" : {
+      "type" : "string"
+    },
+    "jobJarFileNames" : {
       "type" : "array",
       "items" : {
-        "type" : "integer"
+        "type" : "string"
+      }
+    },
+    "jobArtifactFileNames" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobSubmitRequestBody:DistributedCacheFile",
+        "properties" : {
+          "entryName" : {
+            "type" : "string"
+          },
+          "fileName" : {
+            "type" : "string"
+          }
+        }
       }
     }
   }
@@ -2461,79 +2427,6 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=#pa
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" 
colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/accumulators</strong></td>
-    </tr>
-    <tr>
-      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
-      <td class="text-left">Response code: <code>200 OK</code></td>
-    </tr>
-    <tr>
-      <td colspan="2">Returns user-defined accumulators of a task, aggregated 
across all subtasks.</td>
-    </tr>
-    <tr>
-      <td colspan="2">Path parameters</td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <ul>
-<li><code>jobid</code> - description</li>
-<li><code>vertexid</code> - description</li>
-        </ul>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" 
data-target="#485581006">Request</button>
-        <div id="485581006" class="collapse">
-          <pre>
-            <code>
-{}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-    <tr>
-      <td colspan="2">
-        <button data-toggle="collapse" 
data-target="#-1070353054">Response</button>
-        <div id="-1070353054" class="collapse">
-          <pre>
-            <code>
-{
-  "type" : "object",
-  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexAccumulatorsInfo",
-  "properties" : {
-    "id" : {
-      "type" : "string"
-    },
-    "user-accumulators" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
-        "properties" : {
-          "name" : {
-            "type" : "string"
-          },
-          "type" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
-  }
-}            </code>
-          </pre>
-         </div>
-      </td>
-    </tr>
-  </tbody>
-</table>
-<table class="table table-bordered">
-  <tbody>
-    <tr>
       <td class="text-left" 
colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/backpressure</strong></td>
     </tr>
     <tr>
@@ -2675,6 +2568,100 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=#pa
 <table class="table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" 
colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/accumulators</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns all user-defined accumulators for all subtasks 
of a task.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" 
data-target="#-886388859">Request</button>
+        <div id="-886388859" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" 
data-target="#112317594">Response</button>
+        <div id="112317594" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo",
+  "properties" : {
+    "id" : {
+      "type" : "any"
+    },
+    "parallelism" : {
+      "type" : "integer"
+    },
+    "subtasks" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:SubtasksAllAccumulatorsInfo:SubtaskAccumulatorsInfo",
+        "properties" : {
+          "subtask" : {
+            "type" : "integer"
+          },
+          "attempt" : {
+            "type" : "integer"
+          },
+          "host" : {
+            "type" : "string"
+          },
+          "user-accumulators" : {
+            "type" : "array",
+            "items" : {
+              "type" : "object",
+              "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:UserAccumulator",
+              "properties" : {
+                "name" : {
+                  "type" : "string"
+                },
+                "type" : {
+                  "type" : "string"
+                },
+                "value" : {
+                  "type" : "string"
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" 
colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 85699d7..935a07f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -22,15 +22,16 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.NewClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobClient;
-import org.apache.flink.runtime.client.ClientUtils;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.FileUpload;
 import org.apache.flink.runtime.rest.RestClient;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
@@ -49,8 +51,6 @@ import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusHeader
 import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingStatusMessageParameters;
 import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerHeaders;
 import 
org.apache.flink.runtime.rest.handler.job.rescaling.RescalingTriggerMessageParameters;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -87,10 +87,10 @@ import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerReq
 import 
org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
 import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
 import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.runtime.util.ScalaUtils;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
@@ -102,20 +102,20 @@ import org.apache.flink.util.function.CheckedSupplier;
 import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import akka.actor.AddressFromURIString;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.io.ObjectOutputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -315,36 +315,61 @@ public class RestClusterClient<T> extends 
ClusterClient<T> implements NewCluster
                // we have to enable queued scheduling because slot will be 
allocated lazily
                jobGraph.setAllowQueuedScheduling(true);
 
-               log.info("Requesting blob server port.");
-               CompletableFuture<BlobServerPortResponseBody> portFuture = 
sendRequest(BlobServerPortHeaders.getInstance());
+               CompletableFuture<java.nio.file.Path> jobGraphFileFuture = 
CompletableFuture.supplyAsync(() -> {
+                       try {
+                               final java.nio.file.Path jobGraphFile = 
Files.createTempFile("flink-jobgraph", ".bin");
+                               try (ObjectOutputStream objectOut = new 
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+                                       objectOut.writeObject(jobGraph);
+                               }
+                               return jobGraphFile;
+                       } catch (IOException e) {
+                               throw new CompletionException(new 
FlinkException("Failed to serialize JobGraph.", e));
+                       }
+               }, executorService);
 
-               CompletableFuture<JobGraph> jobUploadFuture = 
portFuture.thenCombine(
-                       getDispatcherAddress(),
-                       (BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
-                               final int blobServerPort = response.port;
-                               final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
+               CompletableFuture<Tuple2<JobSubmitRequestBody, 
Collection<FileUpload>>> requestFuture = 
jobGraphFileFuture.thenApply(jobGraphFile -> {
+                       List<String> jarFileNames = new ArrayList<>(8);
+                       List<JobSubmitRequestBody.DistributedCacheFile> 
artifactFileNames = new ArrayList<>(8);
+                       Collection<FileUpload> filesToUpload = new 
ArrayList<>(8);
 
-                               try {
-                                       
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address, 
flinkConfig));
-                               } catch (Exception e) {
-                                       throw new CompletionException(e);
-                               }
+                       filesToUpload.add(new FileUpload(jobGraphFile, 
RestConstants.CONTENT_TYPE_BINARY));
 
-                               return jobGraph;
-                       });
+                       for (Path jar : jobGraph.getUserJars()) {
+                               jarFileNames.add(jar.getName());
+                               filesToUpload.add(new 
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+                       }
 
-               CompletableFuture<JobSubmitResponseBody> submissionFuture = 
jobUploadFuture.thenCompose(
-                       (JobGraph jobGraphToSubmit) -> {
-                               log.info("Submitting job graph.");
+                       for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> artifacts : 
jobGraph.getUserArtifacts().entrySet()) {
+                               artifactFileNames.add(new 
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new 
Path(artifacts.getValue().filePath).getName()));
+                               filesToUpload.add(new 
FileUpload(Paths.get(artifacts.getValue().filePath), 
RestConstants.CONTENT_TYPE_BINARY));
+                       }
 
-                               try {
-                                       return sendRequest(
-                                               JobSubmitHeaders.getInstance(),
-                                               new 
JobSubmitRequestBody(jobGraph));
-                               } catch (IOException ioe) {
-                                       throw new CompletionException(new 
FlinkException("Could not create JobSubmitRequestBody.", ioe));
-                               }
-                       });
+                       final JobSubmitRequestBody requestBody = new 
JobSubmitRequestBody(
+                               jobGraphFile.getFileName().toString(),
+                               jarFileNames,
+                               artifactFileNames);
+
+                       return Tuple2.of(requestBody, 
Collections.unmodifiableCollection(filesToUpload));
+               });
+
+               final CompletableFuture<JobSubmitResponseBody> submissionFuture 
= requestFuture.thenCompose(
+                       requestAndFileUploads -> sendRetriableRequest(
+                               JobSubmitHeaders.getInstance(),
+                               EmptyMessageParameters.getInstance(),
+                               requestAndFileUploads.f0,
+                               requestAndFileUploads.f1,
+                               isConnectionProblemOrServiceUnavailable())
+               );
+
+               submissionFuture
+                       .thenCombine(jobGraphFileFuture, (ignored, 
jobGraphFile) -> jobGraphFile)
+                       .thenAccept(jobGraphFile -> {
+                       try {
+                               Files.delete(jobGraphFile);
+                       } catch (IOException e) {
+                               log.warn("Could not delete temporary file {}.", 
jobGraphFile, e);
+                       }
+               });
 
                return submissionFuture
                        .thenApply(
@@ -676,9 +701,14 @@ public class RestClusterClient<T> extends ClusterClient<T> 
implements NewCluster
 
        private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
                        sendRetriableRequest(M messageHeaders, U 
messageParameters, R request, Predicate<Throwable> retryPredicate) {
+               return sendRetriableRequest(messageHeaders, messageParameters, 
request, Collections.emptyList(), retryPredicate);
+       }
+
+       private <M extends MessageHeaders<R, P, U>, U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody> 
CompletableFuture<P>
+       sendRetriableRequest(M messageHeaders, U messageParameters, R request, 
Collection<FileUpload> filesToUpload, Predicate<Throwable> retryPredicate) {
                return retry(() -> 
getWebMonitorBaseUrl().thenCompose(webMonitorBaseUrl -> {
                        try {
-                               return 
restClient.sendRequest(webMonitorBaseUrl.getHost(), 
webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request);
+                               return 
restClient.sendRequest(webMonitorBaseUrl.getHost(), 
webMonitorBaseUrl.getPort(), messageHeaders, messageParameters, request, 
filesToUpload);
                        } catch (IOException e) {
                                throw new CompletionException(e);
                        }
@@ -736,26 +766,4 @@ public class RestClusterClient<T> extends ClusterClient<T> 
implements NewCluster
                                }
                        }, executorService);
        }
-
-       private CompletableFuture<String> getDispatcherAddress() {
-               return FutureUtils.orTimeout(
-                               dispatcherLeaderRetriever.getLeaderFuture(),
-                               
restClusterClientConfiguration.getAwaitLeaderTimeout(),
-                               TimeUnit.MILLISECONDS)
-                       .thenApplyAsync(leaderAddressSessionId -> {
-                               final String address = 
leaderAddressSessionId.f0;
-                               final Optional<String> host = 
ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
-
-                               return host.orElseGet(() -> {
-                                       // if the dispatcher address does not 
contain a host part, then assume it's running
-                                       // on the same machine as the client
-                                       log.info("The dispatcher seems to run 
without remoting enabled. This indicates that we are " +
-                                               "in a test. This can only work 
if the RestClusterClient runs on the same machine. " +
-                                               "Assuming, therefore, 
'localhost' as the host.");
-
-                                       return "localhost";
-                               });
-                       }, executorService);
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index f025d67..75f16c0 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -53,8 +53,6 @@ import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -226,7 +224,6 @@ public class RestClusterClientTest extends TestLogger {
 
        @Test
        public void testJobSubmitCancelStop() throws Exception {
-               TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
                TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
                TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
                TestJobExecutionResultHandler testJobExecutionResultHandler =
@@ -237,15 +234,12 @@ public class RestClusterClientTest extends TestLogger {
                                        .build()));
 
                try (TestRestServerEndpoint ignored = createRestServerEndpoint(
-                       portHandler,
                        submitHandler,
                        terminationHandler,
                        testJobExecutionResultHandler)) {
 
-                       Assert.assertFalse(portHandler.portRetrieved);
                        Assert.assertFalse(submitHandler.jobSubmitted);
                        restClusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
-                       Assert.assertTrue(portHandler.portRetrieved);
                        Assert.assertTrue(submitHandler.jobSubmitted);
 
                        Assert.assertFalse(terminationHandler.jobCanceled);
@@ -264,11 +258,9 @@ public class RestClusterClientTest extends TestLogger {
        @Test
        public void testDetachedJobSubmission() throws Exception {
 
-               final TestBlobServerPortHandler testBlobServerPortHandler = new 
TestBlobServerPortHandler();
                final TestJobSubmitHandler testJobSubmitHandler = new 
TestJobSubmitHandler();
 
                try (TestRestServerEndpoint ignored = createRestServerEndpoint(
-                       testBlobServerPortHandler,
                        testJobSubmitHandler)) {
 
                        restClusterClient.setDetached(true);
@@ -282,20 +274,6 @@ public class RestClusterClientTest extends TestLogger {
 
        }
 
-       private class TestBlobServerPortHandler extends 
TestHandler<EmptyRequestBody, BlobServerPortResponseBody, 
EmptyMessageParameters> {
-               private volatile boolean portRetrieved = false;
-
-               private TestBlobServerPortHandler() {
-                       super(BlobServerPortHeaders.getInstance());
-               }
-
-               @Override
-               protected CompletableFuture<BlobServerPortResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-                       portRetrieved = true;
-                       return CompletableFuture.completedFuture(new 
BlobServerPortResponseBody(12000));
-               }
-       }
-
        private class TestJobSubmitHandler extends 
TestHandler<JobSubmitRequestBody, JobSubmitResponseBody, 
EmptyMessageParameters> {
                private volatile boolean jobSubmitted = false;
 
@@ -390,7 +368,6 @@ public class RestClusterClientTest extends TestLogger {
 
                try (TestRestServerEndpoint ignored = createRestServerEndpoint(
                        testJobExecutionResultHandler,
-                       new TestBlobServerPortHandler(),
                        new TestJobSubmitHandler())) {
 
                        JobExecutionResult jobExecutionResult;

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 10387c8..1e620d4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -114,7 +114,7 @@ public class JarRunHandler extends
                CompletableFuture<JobGraph> jarUploadFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
                        final InetSocketAddress address = new 
InetSocketAddress(getDispatcherHost(gateway), blobServerPort);
                        try {
-                               ClientUtils.uploadJobGraphFiles(jobGraph, () -> 
new BlobClient(address, configuration));
+                               
ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new 
BlobClient(address, configuration));
                        } catch (FlinkException e) {
                                throw new CompletionException(e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index a1ef82b..83db224 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -70,11 +71,11 @@ public class JarUploadHandler extends
        protected CompletableFuture<JarUploadResponseBody> handleRequest(
                        @Nonnull final HandlerRequest<EmptyRequestBody, 
EmptyMessageParameters> request,
                        @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
-               Collection<Path> uploadedFiles = request.getUploadedFiles();
+               Collection<File> uploadedFiles = request.getUploadedFiles();
                if (uploadedFiles.size() != 1) {
                        throw new RestHandlerException("Exactly 1 file must be 
sent, received " + uploadedFiles.size() + '.', HttpResponseStatus.BAD_REQUEST);
                }
-               final Path fileUpload = uploadedFiles.iterator().next();
+               final Path fileUpload = 
uploadedFiles.iterator().next().toPath();
                return CompletableFuture.supplyAsync(() -> {
                        if 
(!fileUpload.getFileName().toString().endsWith(".jar")) {
                                throw new CompletionException(new 
RestHandlerException(

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index 812d4c6..c9e25ed 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -131,6 +131,6 @@ public class JarUploadHandlerTest extends TestLogger {
                        EmptyMessageParameters.getInstance(),
                        Collections.emptyMap(),
                        Collections.emptyMap(),
-                       Collections.singleton(uploadedFile));
+                       Collections.singleton(uploadedFile.toFile()));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
index fc6a621..06baaaf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -32,7 +31,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -43,20 +41,41 @@ public enum ClientUtils {
        ;
 
        /**
-        * Uploads all files required for the execution of the given {@link 
JobGraph} using the {@link BlobClient} from
-        * the given {@link Supplier}.
+        * Extracts all files required for the execution from the given {@link 
JobGraph} and uploads them using the {@link BlobClient}
+        * from the given {@link Supplier}.
         *
         * @param jobGraph jobgraph requiring files
         * @param clientSupplier supplier of blob client to upload files with
-        * @throws IOException if the upload fails
+        * @throws FlinkException if the upload fails
         */
-       public static void uploadJobGraphFiles(JobGraph jobGraph, 
SupplierWithException<BlobClient, IOException> clientSupplier) throws 
FlinkException {
+       public static void extractAndUploadJobGraphFiles(JobGraph jobGraph, 
SupplierWithException<BlobClient, IOException> clientSupplier) throws 
FlinkException {
                List<Path> userJars = jobGraph.getUserJars();
-               Map<String, DistributedCache.DistributedCacheEntry> 
userArtifacts = jobGraph.getUserArtifacts();
+               Collection<Tuple2<String, Path>> userArtifacts = 
jobGraph.getUserArtifacts().entrySet().stream()
+                       .map(entry -> Tuple2.of(entry.getKey(), new 
Path(entry.getValue().filePath)))
+                       .collect(Collectors.toList());
+
+               uploadJobGraphFiles(jobGraph, userJars, userArtifacts, 
clientSupplier);
+       }
+
+       /**
+        * Uploads the given jars and artifacts required for the execution of 
the given {@link JobGraph} using the {@link BlobClient} from
+        * the given {@link Supplier}.
+        *
+        * @param jobGraph jobgraph requiring files
+        * @param userJars jars to upload
+        * @param userArtifacts artifacts to upload
+        * @param clientSupplier supplier of blob client to upload files with
+        * @throws FlinkException if the upload fails
+        */
+       public static void uploadJobGraphFiles(
+                       JobGraph jobGraph,
+                       Collection<Path> userJars,
+                       Collection<Tuple2<String, 
org.apache.flink.core.fs.Path>> userArtifacts,
+                       SupplierWithException<BlobClient, IOException> 
clientSupplier) throws FlinkException {
                if (!userJars.isEmpty() || !userArtifacts.isEmpty()) {
                        try (BlobClient client = clientSupplier.get()) {
-                               uploadAndSetUserJars(jobGraph, client);
-                               uploadAndSetUserArtifacts(jobGraph, client);
+                               uploadAndSetUserJars(jobGraph, userJars, 
client);
+                               uploadAndSetUserArtifacts(jobGraph, 
userArtifacts, client);
                        } catch (IOException ioe) {
                                throw new FlinkException("Could not upload job 
files.", ioe);
                        }
@@ -64,15 +83,15 @@ public enum ClientUtils {
        }
 
        /**
-        * Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
-        * and sets the appropriate blobkeys.
+        * Uploads the given user jars using the given {@link BlobClient}, and 
sets the appropriate blobkeys on the given {@link JobGraph}.
         *
-        * @param jobGraph   jobgraph requiring user jars
+        * @param jobGraph jobgraph requiring user jars
+        * @param userJars jars to upload
         * @param blobClient client to upload jars with
         * @throws IOException if the upload fails
         */
-       private static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
-               Collection<PermanentBlobKey> blobKeys = 
uploadUserJars(jobGraph.getJobID(), jobGraph.getUserJars(), blobClient);
+       private static void uploadAndSetUserJars(JobGraph jobGraph, 
Collection<Path> userJars, BlobClient blobClient) throws IOException {
+               Collection<PermanentBlobKey> blobKeys = 
uploadUserJars(jobGraph.getJobID(), userJars, blobClient);
                setUserJarBlobKeys(blobKeys, jobGraph);
        }
 
@@ -90,18 +109,14 @@ public enum ClientUtils {
        }
 
        /**
-        * Uploads the user artifacts from the given {@link JobGraph} using the 
given {@link BlobClient},
-        * and sets the appropriate blobkeys.
+        * Uploads the given user artifacts using the given {@link BlobClient}, 
and sets the appropriate blobkeys on the given {@link JobGraph}.
         *
         * @param jobGraph jobgraph requiring user artifacts
+        * @param artifactPaths artifacts to upload
         * @param blobClient client to upload artifacts with
         * @throws IOException if the upload fails
         */
-       private static void uploadAndSetUserArtifacts(JobGraph jobGraph, 
BlobClient blobClient) throws IOException {
-               Collection<Tuple2<String, Path>> artifactPaths = 
jobGraph.getUserArtifacts().entrySet().stream()
-                       .map(entry -> Tuple2.of(entry.getKey(), new 
Path(entry.getValue().filePath)))
-                       .collect(Collectors.toList());
-
+       private static void uploadAndSetUserArtifacts(JobGraph jobGraph, 
Collection<Tuple2<String, Path>> artifactPaths, BlobClient blobClient) throws 
IOException {
                Collection<Tuple2<String, PermanentBlobKey>> blobKeys = 
uploadUserArtifacts(jobGraph.getJobID(), artifactPaths, blobClient);
                setUserArtifactBlobKeys(jobGraph, blobKeys);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 27da3b8..635def2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -424,7 +424,7 @@ public class JobClient {
                }
 
                try {
-                       ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(blobServerAddress, config));
+                       ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () 
-> new BlobClient(blobServerAddress, config));
                } catch (FlinkException e) {
                        throw new JobSubmissionException(jobGraph.getJobID(),
                                        "Could not upload job files.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
index 2783b09..89978fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobSubmissionClientActor.java
@@ -155,7 +155,7 @@ public class JobSubmissionClientActor extends 
JobClientActor {
                final CompletableFuture<Void> jarUploadFuture = 
blobServerAddressFuture.thenAcceptAsync(
                        (InetSocketAddress blobServerAddress) -> {
                                try {
-                                       
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(blobServerAddress, clientConfig));
+                                       
ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new 
BlobClient(blobServerAddress, clientConfig));
                                } catch (FlinkException e) {
                                        throw new CompletionException(e);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 8072cf4..4279330 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
-import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
@@ -88,17 +87,12 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
 
                final Time timeout = restConfiguration.getTimeout();
 
-               BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(
-                       restAddressFuture,
-                       leaderRetriever,
-                       timeout,
-                       responseHeaders);
-
                JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
-                       responseHeaders);
+                       responseHeaders,
+                       executor);
 
                if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
                        try {
@@ -125,7 +119,6 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        log.info("Web-based job submission is not enabled.");
                }
 
-               
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));
                handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
jobSubmitHandler));
 
                return handlers;

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 4fab2b8..97ab5a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -676,7 +676,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        private CompletableFuture<Void> uploadAndSetJobFiles(final 
CompletableFuture<InetSocketAddress> blobServerAddressFuture, final JobGraph 
job) {
                return blobServerAddressFuture.thenAccept(blobServerAddress -> {
                        try {
-                               ClientUtils.uploadJobGraphFiles(job, () -> new 
BlobClient(blobServerAddress, miniClusterConfiguration.getConfiguration()));
+                               ClientUtils.extractAndUploadJobGraphFiles(job, 
() -> new BlobClient(blobServerAddress, 
miniClusterConfiguration.getConfiguration()));
                        } catch (FlinkException e) {
                                throw new CompletionException(e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
index 1e88425..0d8605a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
@@ -49,7 +49,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -82,98 +84,84 @@ public abstract class AbstractHandler<T extends 
RestfulGateway, R extends Reques
        }
 
        @Override
-       protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest 
routedRequest, T gateway) throws Exception {
+       protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest 
routedRequest, T gateway) {
                HttpRequest httpRequest = routedRequest.getRequest();
                if (log.isTraceEnabled()) {
                        log.trace("Received request " + httpRequest.uri() + 
'.');
                }
 
+               FileUploads uploadedFiles = null;
                try {
                        if (!(httpRequest instanceof FullHttpRequest)) {
                                // The RestServerEndpoint defines a 
HttpObjectAggregator in the pipeline that always returns
                                // FullHttpRequests.
                                log.error("Implementation error: Received a 
request that wasn't a FullHttpRequest.");
-                               HandlerUtils.sendErrorResponse(
-                                       ctx,
-                                       httpRequest,
-                                       new ErrorResponseBody("Bad request 
received."),
-                                       HttpResponseStatus.BAD_REQUEST,
-                                       responseHeaders);
-                               return;
+                               throw new RestHandlerException("Bad request 
received.", HttpResponseStatus.BAD_REQUEST);
                        }
 
                        final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
 
-                       try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+                       uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx);
 
-                               if 
(!untypedResponseMessageHeaders.acceptsFileUploads() && 
!uploadedFiles.getUploadedFiles().isEmpty()) {
-                                       HandlerUtils.sendErrorResponse(
-                                               ctx,
-                                               httpRequest,
-                                               new ErrorResponseBody("File 
uploads not allowed."),
-                                               HttpResponseStatus.BAD_REQUEST,
-                                               responseHeaders);
-                                       return;
-                               }
+                       if (!untypedResponseMessageHeaders.acceptsFileUploads() 
&& !uploadedFiles.getUploadedFiles().isEmpty()) {
+                               throw new RestHandlerException("File uploads 
not allowed.", HttpResponseStatus.BAD_REQUEST);
+                       }
 
-                               R request;
-                               if (msgContent.capacity() == 0) {
-                                       try {
-                                               request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
-                                       } catch (JsonParseException | 
JsonMappingException je) {
-                                               log.error("Request did not 
conform to expected format.", je);
-                                               HandlerUtils.sendErrorResponse(
-                                                       ctx,
-                                                       httpRequest,
-                                                       new 
ErrorResponseBody("Bad request received."),
-                                                       
HttpResponseStatus.BAD_REQUEST,
-                                                       responseHeaders);
-                                               return;
-                                       }
-                               } else {
-                                       try {
-                                               ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
-                                               request = MAPPER.readValue(in, 
untypedResponseMessageHeaders.getRequestClass());
-                                       } catch (JsonParseException | 
JsonMappingException je) {
-                                               log.error("Failed to read 
request.", je);
-                                               HandlerUtils.sendErrorResponse(
-                                                       ctx,
-                                                       httpRequest,
-                                                       new 
ErrorResponseBody(String.format("Request did not match expected format %s.", 
untypedResponseMessageHeaders.getRequestClass().getSimpleName())),
-                                                       
HttpResponseStatus.BAD_REQUEST,
-                                                       responseHeaders);
-                                               return;
-                                       }
+                       R request;
+                       if (msgContent.capacity() == 0) {
+                               try {
+                                       request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
+                               } catch (JsonParseException | 
JsonMappingException je) {
+                                       log.error("Request did not conform to 
expected format.", je);
+                                       throw new RestHandlerException("Bad 
request received.", HttpResponseStatus.BAD_REQUEST, je);
                                }
-
-                               final HandlerRequest<R, M> handlerRequest;
-
+                       } else {
                                try {
-                                       handlerRequest = new HandlerRequest<R, 
M>(
-                                               request,
-                                               
untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
-                                               
routedRequest.getRouteResult().pathParams(),
-                                               
routedRequest.getRouteResult().queryParams(),
-                                               
uploadedFiles.getUploadedFiles());
-                               } catch (HandlerRequestException hre) {
-                                       log.error("Could not create the handler 
request.", hre);
-
-                                       HandlerUtils.sendErrorResponse(
-                                               ctx,
-                                               httpRequest,
-                                               new 
ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", 
hre.getMessage())),
+                                       ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
+                                       request = MAPPER.readValue(in, 
untypedResponseMessageHeaders.getRequestClass());
+                               } catch (JsonParseException | 
JsonMappingException je) {
+                                       log.error("Failed to read request.", 
je);
+                                       throw new RestHandlerException(
+                                               String.format("Request did not 
match expected format %s.", 
untypedResponseMessageHeaders.getRequestClass().getSimpleName()),
                                                HttpResponseStatus.BAD_REQUEST,
-                                               responseHeaders);
-                                       return;
+                                               je);
                                }
+                       }
 
-                               respondToRequest(
-                                       ctx,
-                                       httpRequest,
-                                       handlerRequest,
-                                       gateway);
+                       final HandlerRequest<R, M> handlerRequest;
+
+                       try {
+                               handlerRequest = new HandlerRequest<R, M>(
+                                       request,
+                                       
untypedResponseMessageHeaders.getUnresolvedMessageParameters(),
+                                       
routedRequest.getRouteResult().pathParams(),
+                                       
routedRequest.getRouteResult().queryParams(),
+                                       uploadedFiles.getUploadedFiles());
+                       } catch (HandlerRequestException hre) {
+                               log.error("Could not create the handler 
request.", hre);
+                               throw new RestHandlerException(
+                                       String.format("Bad request, could not 
parse parameters: %s", hre.getMessage()),
+                                       HttpResponseStatus.BAD_REQUEST,
+                                       hre);
                        }
 
+                       CompletableFuture<Void> requestProcessingFuture = 
respondToRequest(
+                               ctx,
+                               httpRequest,
+                               handlerRequest,
+                               gateway);
+
+                       final FileUploads finalUploadedFiles = uploadedFiles;
+                       requestProcessingFuture
+                               .whenComplete((Void ignored, Throwable 
throwable) -> cleanupFileUploads(finalUploadedFiles));
+               } catch (RestHandlerException rhe) {
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               httpRequest,
+                               new ErrorResponseBody(rhe.getMessage()),
+                               rhe.getHttpResponseStatus(),
+                               responseHeaders);
+                       cleanupFileUploads(uploadedFiles);
                } catch (Throwable e) {
                        log.error("Request processing failed.", e);
                        HandlerUtils.sendErrorResponse(
@@ -182,6 +170,17 @@ public abstract class AbstractHandler<T extends 
RestfulGateway, R extends Reques
                                new ErrorResponseBody("Internal server error."),
                                HttpResponseStatus.INTERNAL_SERVER_ERROR,
                                responseHeaders);
+                       cleanupFileUploads(uploadedFiles);
+               }
+       }
+
+       private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
+               if (uploadedFiles != null) {
+                       try {
+                               uploadedFiles.close();
+                       } catch (IOException e) {
+                               log.warn("Could not cleanup uploaded files.", 
e);
+                       }
                }
        }
 
@@ -192,9 +191,10 @@ public abstract class AbstractHandler<T extends 
RestfulGateway, R extends Reques
         * @param httpRequest original http request
         * @param handlerRequest typed handler request
         * @param gateway leader gateway
+        * @return Future which is completed once the request has been processed
         * @throws RestHandlerException if an exception occurred while 
responding
         */
-       protected abstract void respondToRequest(
+       protected abstract CompletableFuture<Void> respondToRequest(
                ChannelHandlerContext ctx,
                HttpRequest httpRequest,
                HandlerRequest<R, M> handlerRequest,

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 448711b..e4cec08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -70,7 +70,7 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
        }
 
        @Override
-       protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HandlerRequest<R, M> handlerRequest, T gateway) {
+       protected CompletableFuture<Void> 
respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, 
HandlerRequest<R, M> handlerRequest, T gateway) {
                CompletableFuture<P> response;
 
                try {
@@ -79,7 +79,7 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                        response = FutureUtils.completedExceptionally(e);
                }
 
-               response.whenComplete((P resp, Throwable throwable) -> {
+               return response.whenComplete((P resp, Throwable throwable) -> {
                        if (throwable != null) {
 
                                Throwable error = 
ExceptionUtils.stripCompletionException(throwable);
@@ -105,7 +105,7 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                                        messageHeaders.getResponseStatusCode(),
                                        responseHeaders);
                        }
-               });
+               }).thenApply(ignored -> null);
        }
 
        private void processRestHandlerException(ChannelHandlerContext ctx, 
HttpRequest httpRequest, RestHandlerException rhe) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
index 31ac47bb..b233cb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
@@ -24,6 +24,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
@@ -58,7 +59,7 @@ public final class FileUploads implements AutoCloseable {
                this.uploadDirectory = uploadDirectory;
        }
 
-       public Collection<Path> getUploadedFiles() throws IOException {
+       public Collection<File> getUploadedFiles() throws IOException {
                if (uploadDirectory == null) {
                        return Collections.emptyList();
                }
@@ -78,9 +79,9 @@ public final class FileUploads implements AutoCloseable {
 
        private static final class FileAdderVisitor extends 
SimpleFileVisitor<Path> {
 
-               private final Collection<Path> files = new ArrayList<>(4);
+               private final Collection<File> files = new ArrayList<>(4);
 
-               Collection<Path> getContainedFiles() {
+               Collection<File> getContainedFiles() {
                        return files;
                }
 
@@ -90,7 +91,7 @@ public final class FileUploads implements AutoCloseable {
                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
                        FileVisitResult result = super.visitFile(file, attrs);
-                       files.add(file);
+                       files.add(file.toFile());
                        return result;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
index 7e93556..990dae5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
-import java.nio.file.Path;
+import java.io.File;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +43,7 @@ import java.util.StringJoiner;
 public class HandlerRequest<R extends RequestBody, M extends 
MessageParameters> {
 
        private final R requestBody;
-       private final Collection<Path> uploadedFiles;
+       private final Collection<File> uploadedFiles;
        private final Map<Class<? extends MessagePathParameter<?>>, 
MessagePathParameter<?>> pathParameters = new HashMap<>(2);
        private final Map<Class<? extends MessageQueryParameter<?>>, 
MessageQueryParameter<?>> queryParameters = new HashMap<>(2);
 
@@ -55,7 +55,7 @@ public class HandlerRequest<R extends RequestBody, M extends 
MessageParameters>
                this(requestBody, messageParameters, receivedPathParameters, 
receivedQueryParameters, Collections.emptyList());
        }
 
-       public HandlerRequest(R requestBody, M messageParameters, Map<String, 
String> receivedPathParameters, Map<String, List<String>> 
receivedQueryParameters, Collection<Path> uploadedFiles) throws 
HandlerRequestException {
+       public HandlerRequest(R requestBody, M messageParameters, Map<String, 
String> receivedPathParameters, Map<String, List<String>> 
receivedQueryParameters, Collection<File> uploadedFiles) throws 
HandlerRequestException {
                this.requestBody = Preconditions.checkNotNull(requestBody);
                this.uploadedFiles = 
Collections.unmodifiableCollection(Preconditions.checkNotNull(uploadedFiles));
                Preconditions.checkNotNull(messageParameters);
@@ -141,7 +141,7 @@ public class HandlerRequest<R extends RequestBody, M 
extends MessageParameters>
        }
 
        @Nonnull
-       public Collection<Path> getUploadedFiles() {
+       public Collection<File> getUploadedFiles() {
                return uploadedFiles;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
deleted file mode 100644
index 4b5fa89..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
-import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
-import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.ExceptionUtils;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import javax.annotation.Nonnull;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-
-/**
- * This handler can be used to retrieve the port that the blob server runs on.
- */
-public final class BlobServerPortHandler extends 
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
BlobServerPortResponseBody, EmptyMessageParameters> {
-
-       public BlobServerPortHandler(
-                       CompletableFuture<String> localRestAddress,
-                       GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
-                       Time timeout,
-                       Map<String, String> headers) {
-               super(localRestAddress, leaderRetriever, timeout, headers, 
BlobServerPortHeaders.getInstance());
-       }
-
-       @Override
-       protected CompletableFuture<BlobServerPortResponseBody> 
handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> 
request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
-               return gateway
-                       .getBlobServerPort(timeout)
-                       .thenApply(BlobServerPortResponseBody::new)
-                       .exceptionally(error -> {
-                               throw new CompletionException(new 
RestHandlerException(
-                                       "Failed to retrieve blob server port.",
-                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
-                                       
ExceptionUtils.stripCompletionException(error)));
-                       });
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index af04629..052b056 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -19,8 +19,14 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -29,47 +35,153 @@ import 
org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nonnull;
 
-import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
 
 /**
  * This handler can be used to submit jobs to a Flink cluster.
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, 
JobSubmitResponseBody, EmptyMessageParameters> {
 
+       private static final String FILE_TYPE_JOB_GRAPH = "JobGraph";
+       private static final String FILE_TYPE_JAR = "Jar";
+       private static final String FILE_TYPE_ARTIFACT = "Artifact";
+
+       private final Executor executor;
+
        public JobSubmitHandler(
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
                        Time timeout,
-                       Map<String, String> headers) {
+                       Map<String, String> headers,
+                       Executor executor) {
                super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
+               this.executor = executor;
        }
 
        @Override
        protected CompletableFuture<JobSubmitResponseBody> 
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
-               JobGraph jobGraph;
-               try {
-                       ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
-                       jobGraph = (JobGraph) objectIn.readObject();
-               } catch (Exception e) {
+               final Collection<File> uploadedFiles = 
request.getUploadedFiles();
+               final Map<String, Path> nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
+                       File::getName,
+                       Path::fromLocalFile
+               ));
+
+               if (uploadedFiles.size() != nameToFile.size()) {
                        throw new RestHandlerException(
-                               "Failed to deserialize JobGraph.",
-                               HttpResponseStatus.BAD_REQUEST,
-                               e);
+                               String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
+                                       uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
+                                       nameToFile.size(),
+                                       uploadedFiles.size()),
+                               HttpResponseStatus.BAD_REQUEST
+                       );
                }
 
-               return gateway.submitJob(jobGraph, timeout)
-                       .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
+               final JobSubmitRequestBody requestBody = 
request.getRequestBody();
+
+               CompletableFuture<JobGraph> jobGraphFuture = 
loadJobGraph(requestBody, nameToFile);
+
+               Collection<Path> jarFiles = 
getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
+
+               Collection<Tuple2<String, Path>> artifacts = 
getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
+
+               CompletableFuture<JobGraph> finalizedJobGraphFuture = 
uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts);
+
+               CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
+
+               return jobSubmissionFuture.thenCombine(jobGraphFuture,
+                       (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()))
                        .exceptionally(exception -> {
                                throw new CompletionException(new 
RestHandlerException("Job submission failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, exception));
                        });
        }
+
+       private CompletableFuture<JobGraph> loadJobGraph(JobSubmitRequestBody 
requestBody, Map<String, Path> nameToFile) throws MissingFileException {
+               final Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_JOB_GRAPH, 
nameToFile);
+
+               return CompletableFuture.supplyAsync(() -> {
+                       JobGraph jobGraph;
+                       try (ObjectInputStream objectIn = new 
ObjectInputStream(jobGraphFile.getFileSystem().open(jobGraphFile))) {
+                               jobGraph = (JobGraph) objectIn.readObject();
+                       } catch (Exception e) {
+                               throw new CompletionException(new 
RestHandlerException(
+                                       "Failed to deserialize JobGraph.",
+                                       HttpResponseStatus.BAD_REQUEST,
+                                       e));
+                       }
+                       return jobGraph;
+               }, executor);
+       }
+
+       private static Collection<Path> getJarFilesToUpload(Collection<String> 
jarFileNames, Map<String, Path> nameToFileMap) throws MissingFileException {
+               Collection<Path> jarFiles = new 
ArrayList<>(jarFileNames.size());
+               for (String jarFileName : jarFileNames) {
+                       Path jarFile = getPathAndAssertUpload(jarFileName, 
FILE_TYPE_JAR, nameToFileMap);
+                       jarFiles.add(new Path(jarFile.toString()));
+               }
+               return jarFiles;
+       }
+
+       private static Collection<Tuple2<String, Path>> 
getArtifactFilesToUpload(
+                       Collection<JobSubmitRequestBody.DistributedCacheFile> 
artifactEntries,
+                       Map<String, Path> nameToFileMap) throws 
MissingFileException {
+               Collection<Tuple2<String, Path>> artifacts = new 
ArrayList<>(artifactEntries.size());
+               for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: artifactEntries) {
+                       Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, FILE_TYPE_ARTIFACT, 
nameToFileMap);
+                       artifacts.add(Tuple2.of(artifactFileName.entryName, new 
Path(artifactFile.toString())));
+               }
+               return artifacts;
+       }
+
+       private CompletableFuture<JobGraph> uploadJobGraphFiles(
+                       DispatcherGateway gateway,
+                       CompletableFuture<JobGraph> jobGraphFuture,
+                       Collection<Path> jarFiles,
+                       Collection<Tuple2<String, Path>> artifacts) {
+               CompletableFuture<Integer> blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
+
+               return jobGraphFuture.thenCombine(blobServerPortFuture, 
(JobGraph jobGraph, Integer blobServerPort) -> {
+                       final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
+                       try {
+                               ClientUtils.uploadJobGraphFiles(jobGraph, 
jarFiles, artifacts, () -> new BlobClient(address, new Configuration()));
+                       } catch (FlinkException e) {
+                               throw new CompletionException(new 
RestHandlerException(
+                                       "Could not upload job files.",
+                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                       e));
+                       }
+                       return jobGraph;
+               });
+       }
+
+       private static Path getPathAndAssertUpload(String fileName, String 
type, Map<String, Path> uploadedFiles) throws MissingFileException {
+               final Path file = uploadedFiles.get(fileName);
+               if (file == null) {
+                       throw new MissingFileException(type, fileName);
+               }
+               return file;
+       }
+
+       private static final class MissingFileException extends 
RestHandlerException {
+
+               private static final long serialVersionUID = 
-7954810495610194965L;
+
+               MissingFileException(String type, String fileName) {
+                       super(type + " file " + fileName + " could not be found 
on the server.", HttpResponseStatus.BAD_REQUEST);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 83fab69..edefa15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -117,7 +117,7 @@ public abstract class AbstractTaskManagerFileHandler<M 
extends TaskManagerMessag
        }
 
        @Override
-       protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway 
gateway) throws RestHandlerException {
+       protected CompletableFuture<Void> 
respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, 
HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) 
throws RestHandlerException {
                final ResourceID taskManagerId = 
handlerRequest.getPathParameter(TaskManagerIdPathParameter.class);
 
                final CompletableFuture<TransientBlobKey> blobKeyFuture;
@@ -152,7 +152,7 @@ public abstract class AbstractTaskManagerFileHandler<M 
extends TaskManagerMessag
                        },
                        ctx.executor());
 
-               resultFuture.whenComplete(
+               return resultFuture.whenComplete(
                        (Void ignored, Throwable throwable) -> {
                                if (throwable != null) {
                                        log.error("Failed to transfer file from 
TaskExecutor {}.", taskManagerId, throwable);

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
deleted file mode 100644
index a845de3..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortHeaders.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * These headers define the protocol for querying the port of the blob server.
- */
-public class BlobServerPortHeaders implements MessageHeaders<EmptyRequestBody, 
BlobServerPortResponseBody, EmptyMessageParameters> {
-
-       private static final String URL = "/blobserver/port";
-       private static final BlobServerPortHeaders INSTANCE = new 
BlobServerPortHeaders();
-
-       private BlobServerPortHeaders() {
-       }
-
-       @Override
-       public Class<EmptyRequestBody> getRequestClass() {
-               return EmptyRequestBody.class;
-       }
-
-       @Override
-       public HttpMethodWrapper getHttpMethod() {
-               return HttpMethodWrapper.GET;
-       }
-
-       @Override
-       public String getTargetRestEndpointURL() {
-               return URL;
-       }
-
-       @Override
-       public Class<BlobServerPortResponseBody> getResponseClass() {
-               return BlobServerPortResponseBody.class;
-       }
-
-       @Override
-       public HttpResponseStatus getResponseStatusCode() {
-               return HttpResponseStatus.OK;
-       }
-
-       @Override
-       public EmptyMessageParameters getUnresolvedMessageParameters() {
-               return EmptyMessageParameters.getInstance();
-       }
-
-       public static BlobServerPortHeaders getInstance() {
-               return INSTANCE;
-       }
-
-       @Override
-       public String getDescription() {
-               return "Returns the port of blob server which can be used to 
upload jars.";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
deleted file mode 100644
index 895ecf3..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseBody.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * Response containing the blob server port.
- */
-public final class BlobServerPortResponseBody implements ResponseBody {
-
-       static final String FIELD_NAME_PORT = "port";
-
-       /**
-        * The port of the blob server.
-        */
-       @JsonProperty(FIELD_NAME_PORT)
-       public final int port;
-
-       @JsonCreator
-       public BlobServerPortResponseBody(
-               @JsonProperty(FIELD_NAME_PORT) int port) {
-
-               this.port = port;
-       }
-
-       @Override
-       public int hashCode() {
-               return 67 * port;
-       }
-
-       @Override
-       public boolean equals(Object object) {
-               if (object instanceof BlobServerPortResponseBody) {
-                       BlobServerPortResponseBody other = 
(BlobServerPortResponseBody) object;
-                       return this.port == other.port;
-               }
-               return false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
index 88f53f2..42f64b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
+import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -71,6 +72,14 @@ public class JobSubmitHeaders implements 
MessageHeaders<JobSubmitRequestBody, Jo
 
        @Override
        public String getDescription() {
-               return "Submits a job. This call is primarily intended to be 
used by the Flink client.";
+               return "Submits a job. This call is primarily intended to be 
used by the Flink client. This call expects a" +
+                       "multipart/form-data request that consists of file 
uploads for the serialized JobGraph, jars and" +
+                       "distributed cache artifacts and an attribute named \"" 
+ FileUploadHandler.HTTP_ATTRIBUTE_REQUEST + "\"for " +
+                       "the JSON payload.";
+       }
+
+       @Override
+       public boolean acceptsFileUploads() {
+               return true;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
index 3f550f0..0ca82b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
@@ -18,64 +18,119 @@
 
 package org.apache.flink.runtime.rest.messages.job;
 
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 
 /**
  * Request for submitting a job.
  *
- * <p>We currently require the job-jars to be uploaded through the blob-server.
+ * <p>This request only contains the names of files that must be present on 
the server, and defines how these files are
+ * interpreted.
  */
 public final class JobSubmitRequestBody implements RequestBody {
 
-       private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+       private static final String FIELD_NAME_JOB_GRAPH = "jobGraphFileName";
+       private static final String FIELD_NAME_JOB_JARS = "jobJarFileNames";
+       private static final String FIELD_NAME_JOB_ARTIFACTS = 
"jobArtifactFileNames";
 
-       /**
-        * The serialized job graph.
-        */
-       @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
-       public final byte[] serializedJobGraph;
+       @JsonProperty(FIELD_NAME_JOB_GRAPH)
+       public final String jobGraphFileName;
 
-       public JobSubmitRequestBody(JobGraph jobGraph) throws IOException {
-               this(serializeJobGraph(jobGraph));
-       }
+       @JsonProperty(FIELD_NAME_JOB_JARS)
+       public final Collection<String> jarFileNames;
+
+       @JsonProperty(FIELD_NAME_JOB_ARTIFACTS)
+       public final Collection<DistributedCacheFile> artifactFileNames;
 
        @JsonCreator
        public JobSubmitRequestBody(
-                       @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) byte[] 
serializedJobGraph) {
-               this.serializedJobGraph = 
Preconditions.checkNotNull(serializedJobGraph);
+                       @JsonProperty(FIELD_NAME_JOB_GRAPH) String 
jobGraphFileName,
+                       @JsonProperty(FIELD_NAME_JOB_JARS) Collection<String> 
jarFileNames,
+                       @JsonProperty(FIELD_NAME_JOB_ARTIFACTS) 
Collection<DistributedCacheFile> artifactFileNames) {
+               this.jobGraphFileName = jobGraphFileName;
+               this.jarFileNames = jarFileNames;
+               this.artifactFileNames = artifactFileNames;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               JobSubmitRequestBody that = (JobSubmitRequestBody) o;
+               return Objects.equals(jobGraphFileName, that.jobGraphFileName) 
&&
+                       Objects.equals(jarFileNames, that.jarFileNames) &&
+                       Objects.equals(artifactFileNames, 
that.artifactFileNames);
        }
 
        @Override
        public int hashCode() {
-               return 71 * Arrays.hashCode(this.serializedJobGraph);
+               return Objects.hash(jobGraphFileName, jarFileNames, 
artifactFileNames);
        }
 
        @Override
-       public boolean equals(Object object) {
-               if (object instanceof JobSubmitRequestBody) {
-                       JobSubmitRequestBody other = (JobSubmitRequestBody) 
object;
-                       return Arrays.equals(this.serializedJobGraph, 
other.serializedJobGraph);
-               }
-               return false;
+       public String toString() {
+               return "JobSubmitRequestBody{" +
+                       "jobGraphFileName='" + jobGraphFileName + '\'' +
+                       ", jarFileNames=" + jarFileNames +
+                       ", artifactFileNames=" + artifactFileNames +
+                       '}';
        }
 
-       private static byte[] serializeJobGraph(JobGraph jobGraph) throws 
IOException {
-               try (ByteArrayOutputStream baos = new ByteArrayOutputStream(64 
* 1024)) {
-                       ObjectOutputStream out = new ObjectOutputStream(baos);
+       /**
+        * Descriptor for a distributed cache file.
+        */
+       public static class DistributedCacheFile {
+               private static final String FIELD_NAME_ENTRY_NAME = "entryName";
+               private static final String FIELD_NAME_FILE_NAME = "fileName";
+
+               @JsonProperty(FIELD_NAME_ENTRY_NAME)
+               public final String entryName;
+
+               @JsonProperty(FIELD_NAME_FILE_NAME)
+               public final String fileName;
+
+               @JsonCreator
+               public DistributedCacheFile(
+                       @JsonProperty(FIELD_NAME_ENTRY_NAME) String entryName,
+                       @JsonProperty(FIELD_NAME_FILE_NAME) String fileName) {
+                       this.entryName = entryName;
+                       this.fileName = fileName;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       DistributedCacheFile that = (DistributedCacheFile) o;
+                       return Objects.equals(entryName, that.entryName) &&
+                               Objects.equals(fileName, that.fileName);
+               }
 
-                       out.writeObject(jobGraph);
+               @Override
+               public int hashCode() {
+
+                       return Objects.hash(entryName, fileName);
+               }
 
-                       return baos.toByteArray();
+               @Override
+               public String toString() {
+                       return "DistributedCacheFile{" +
+                               "entryName='" + entryName + '\'' +
+                               ", fileName='" + fileName + '\'' +
+                               '}';
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
index c5135bf..a538e17 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestConstants.java
@@ -23,6 +23,10 @@ import org.apache.flink.configuration.ConfigConstants;
 /**
  * This class contains constants to be used by rest components.
  */
-public class RestConstants {
+public enum RestConstants {
+       ;
+
        public static final String REST_CONTENT_TYPE = "application/json; 
charset=" + ConfigConstants.DEFAULT_CHARSET.name();
+       public static final String CONTENT_TYPE_JAR = 
"application/java-archive";
+       public static final String CONTENT_TYPE_BINARY = 
"application/octet-stream";
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
index dc14cb1..f151f28 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
@@ -85,7 +85,7 @@ public class ClientUtilsTest extends TestLogger {
                assertEquals(jars.size(), jobGraph.getUserJars().size());
                assertEquals(0, jobGraph.getUserJarBlobKeys().size());
 
-               ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new 
Configuration()));
+               ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new 
BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new 
Configuration()));
 
                assertEquals(jars.size(), jobGraph.getUserJars().size());
                assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
@@ -124,7 +124,7 @@ public class ClientUtilsTest extends TestLogger {
                assertEquals(totalNumArtifacts, 
jobGraph.getUserArtifacts().size());
                assertEquals(0, 
jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != 
null).count());
 
-               ClientUtils.uploadJobGraphFiles(jobGraph, () -> new 
BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new 
Configuration()));
+               ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> new 
BlobClient(new InetSocketAddress("localhost", blobServer.getPort()), new 
Configuration()));
 
                assertEquals(totalNumArtifacts, 
jobGraph.getUserArtifacts().size());
                assertEquals(localArtifacts.size(), 
jobGraph.getUserArtifacts().values().stream().filter(entry -> entry.blobKey != 
null).count());

http://git-wip-us.apache.org/repos/asf/flink/blob/a25cd3fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
index 91fba68..607c1c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
@@ -81,7 +81,8 @@ public class AbstractHandlerTest extends TestLogger {
                final GatewayRetriever<RestfulGateway> mockGatewayRetriever = 
() ->
                        CompletableFuture.completedFuture(mockRestfulGateway);
 
-               TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+               CompletableFuture<Void> requestProcessingCompleteFuture = new 
CompletableFuture<>();
+               TestHandler handler = new 
TestHandler(requestProcessingCompleteFuture, 
CompletableFuture.completedFuture(restAddress), mockGatewayRetriever);
 
                RouteResult<?> routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
                HttpRequest request = new DefaultFullHttpRequest(
@@ -101,6 +102,9 @@ public class AbstractHandlerTest extends TestLogger {
 
                handler.respondAsLeader(context, routerRequest, 
mockRestfulGateway);
 
+               // the (asynchronous) request processing is not yet complete so 
the files should still exist
+               Assert.assertTrue(Files.exists(file));
+               requestProcessingCompleteFuture.complete(null);
                Assert.assertFalse(Files.exists(file));
        }
 
@@ -156,14 +160,16 @@ public class AbstractHandlerTest extends TestLogger {
        }
 
        private static class TestHandler extends 
AbstractHandler<RestfulGateway, EmptyRequestBody, EmptyMessageParameters> {
+               private final CompletableFuture<Void> completionFuture;
 
-               protected TestHandler(@Nonnull CompletableFuture<String> 
localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> 
leaderRetriever) {
+               protected TestHandler(CompletableFuture<Void> completionFuture, 
@Nonnull CompletableFuture<String> localAddressFuture, @Nonnull 
GatewayRetriever<? extends RestfulGateway> leaderRetriever) {
                        super(localAddressFuture, leaderRetriever, 
RpcUtils.INF_TIMEOUT, Collections.emptyMap(), TestHeaders.INSTANCE);
+                       this.completionFuture = completionFuture;
                }
 
                @Override
-               protected void respondToRequest(ChannelHandlerContext ctx, 
HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, 
EmptyMessageParameters> handlerRequest, RestfulGateway gateway) throws 
RestHandlerException {
-
+               protected CompletableFuture<Void> 
respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, 
HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest, 
RestfulGateway gateway) throws RestHandlerException {
+                       return completionFuture;
                }
 
                private enum TestHeaders implements 
UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> {
@@ -188,6 +194,11 @@ public class AbstractHandlerTest extends TestLogger {
                        public String getTargetRestEndpointURL() {
                                return "/test";
                        }
+
+                       @Override
+                       public boolean acceptsFileUploads() {
+                               return true;
+                       }
                }
        }
 }

Reply via email to