Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r199189017
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
    @@ -34,38 +40,137 @@
     
     import javax.annotation.Nonnull;
     
    -import java.io.ByteArrayInputStream;
    +import java.io.File;
    +import java.io.IOException;
     import java.io.ObjectInputStream;
    +import java.net.InetSocketAddress;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.ArrayList;
    +import java.util.Collection;
     import java.util.Map;
     import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.CompletionException;
    +import java.util.concurrent.Executor;
    +import java.util.stream.Collectors;
     
     /**
      * This handler can be used to submit jobs to a Flink cluster.
      */
     public final class JobSubmitHandler extends 
AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, 
JobSubmitResponseBody, EmptyMessageParameters> {
     
    +   private static final String FILE_TYPE_GRAPH = "JobGraph";
    +   private static final String FILE_TYPE_JAR = "Jar";
    +   private static final String FILE_TYPE_ARTIFACT = "Artifact";
    +
    +   private final Executor executor;
    +
        public JobSubmitHandler(
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
                        Time timeout,
    -                   Map<String, String> headers) {
    +                   Map<String, String> headers,
    +                   Executor executor) {
                super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
    +           this.executor = executor;
        }
     
        @Override
        protected CompletableFuture<JobSubmitResponseBody> 
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
    -           JobGraph jobGraph;
    -           try {
    -                   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
    -                   jobGraph = (JobGraph) objectIn.readObject();
    -           } catch (Exception e) {
    +           Collection<File> uploadedFiles = request.getUploadedFiles();
    +           Map<String, Path> nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
    +                   path -> path.toPath().getFileName().toString(),
    +                   File::toPath
    +           ));
    +
    +           if (uploadedFiles.size() != nameToFile.size()) {
                        throw new RestHandlerException(
    -                           "Failed to deserialize JobGraph.",
    -                           HttpResponseStatus.BAD_REQUEST,
    -                           e);
    +                           String.format("The number of uploaded files was 
%s than the expected count. Expected: %s Actual %s",
    +                                   uploadedFiles.size() < 
nameToFile.size() ? "lower" : "higher",
    +                                   nameToFile.size(),
    +                                   uploadedFiles.size()),
    +                           HttpResponseStatus.BAD_REQUEST
    +                   );
    +           }
    +
    +           JobSubmitRequestBody requestBody = request.getRequestBody();
    +
    +           Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, FILE_TYPE_GRAPH, 
nameToFile);
    +
    +           CompletableFuture<JobGraph> jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
    +                   JobGraph jobGraph;
    +                   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
    +                           jobGraph = (JobGraph) objectIn.readObject();
    +                   } catch (Exception e) {
    +                           throw new CompletionException(new 
RestHandlerException(
    +                                   "Failed to deserialize JobGraph.",
    +                                   HttpResponseStatus.BAD_REQUEST,
    +                                   e));
    +                   }
    +                   return jobGraph;
    +           }, executor);
    +
    +           Collection<org.apache.flink.core.fs.Path> jarFiles = 
getJarFilesToUpload(nameToFile, requestBody.jarFileNames);
    +
    +           Collection<Tuple2<String, org.apache.flink.core.fs.Path>> 
artifacts = getArtifactFilesToUpload(nameToFile, requestBody.artifactFileNames);
    +
    +           CompletableFuture<Integer> blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
    +
    +           CompletableFuture<JobGraph> finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +                   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
    +                   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())){
    +                           Collection<PermanentBlobKey> jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
    +                           ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
    +                           Collection<Tuple2<String, PermanentBlobKey>> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
artifacts, blobClient);
    +                           ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
    --- End diff --
    
    This code snippet looks very much like `ClientUtils#uploadJobGraphFiles`. 
Maybe we could refactor it to be reused here.


---

Reply via email to