[ 
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520381#comment-16520381
 ] 

ASF GitHub Bot commented on FLINK-9289:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6203#discussion_r197451641
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 ---
    @@ -54,18 +69,89 @@ public JobSubmitHandler(
     
        @Override
        protected CompletableFuture<JobSubmitResponseBody> 
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, 
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws 
RestHandlerException {
    -           JobGraph jobGraph;
    -           try {
    -                   ObjectInputStream objectIn = new ObjectInputStream(new 
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
    -                   jobGraph = (JobGraph) objectIn.readObject();
    -           } catch (Exception e) {
    -                   throw new RestHandlerException(
    -                           "Failed to deserialize JobGraph.",
    -                           HttpResponseStatus.BAD_REQUEST,
    -                           e);
    +           Collection<Path> uploadedFiles = request.getUploadedFiles();
    +           Map<String, Path> nameToFile = 
uploadedFiles.stream().collect(Collectors.toMap(
    +                   path -> path.getFileName().toString(),
    +                   entry -> entry
    +           ));
    +
    +           JobSubmitRequestBody requestBody = request.getRequestBody();
    +
    +           Path jobGraphFile = 
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
    +
    +           Collection<org.apache.flink.core.fs.Path> jarFiles = new 
ArrayList<>(requestBody.jarFileNames.size());
    +           for (String jarFileName : requestBody.jarFileNames) {
    +                   Path jarFile = getPathAndAssertUpload(jarFileName, 
"Jar", nameToFile);
    +                   jarFiles.add(new 
org.apache.flink.core.fs.Path(jarFile.toString()));
    +           }
    +
    +           Collection<Tuple2<String, org.apache.flink.core.fs.Path>> 
artifacts = new ArrayList<>(requestBody.artifactFileNames.size());
    +           for (JobSubmitRequestBody.DistributedCacheFile artifactFileName 
: requestBody.artifactFileNames) {
    +                   Path artifactFile = 
getPathAndAssertUpload(artifactFileName.fileName, "Artifact", nameToFile);
    +                   artifacts.add(Tuple2.of(artifactFileName.entryName, new 
org.apache.flink.core.fs.Path(artifactFile.toString())));
                }
     
    -           return gateway.submitJob(jobGraph, timeout)
    -                   .thenApply(ack -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
    +           Map<String, DistributedCache.DistributedCacheEntry> 
temporaryHack = artifacts.stream()
    +                   .collect(Collectors.toMap(
    +                           tuple -> tuple.f0,
    +                           // the actual entry definition is mostly 
irrelevant as only the blobkey is accessed
    +                           // blame whoever wrote the ClientUtils API
    +                           tuple -> new 
DistributedCache.DistributedCacheEntry(tuple.f1.toString(), false)
    +                   ));
    +
    +           // TODO: use executor
    +           CompletableFuture<JobGraph> jobGraphFuture = 
CompletableFuture.supplyAsync(() -> {
    +                   JobGraph jobGraph;
    +                   try (ObjectInputStream objectIn = new 
ObjectInputStream(Files.newInputStream(jobGraphFile))) {
    +                           jobGraph = (JobGraph) objectIn.readObject();
    +                   } catch (Exception e) {
    +                           throw new CompletionException(new 
RestHandlerException(
    +                                   "Failed to deserialize JobGraph.",
    +                                   HttpResponseStatus.BAD_REQUEST,
    +                                   e));
    +                   }
    +                   return jobGraph;
    +           });
    +
    +           CompletableFuture<Integer> blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
    +
    +           CompletableFuture<JobGraph> finalizedJobGraphFuture = 
jobGraphFuture.thenCombine(blobServerPortFuture, (jobGraph, blobServerPort) -> {
    +                   final InetSocketAddress address = new 
InetSocketAddress(gateway.getHostname(), blobServerPort);
    +                   try (BlobClient blobClient = new BlobClient(address, 
new Configuration())) {
    +                           Collection<PermanentBlobKey> jarBlobKeys = 
ClientUtils.uploadUserJars(jobGraph.getJobID(), jarFiles, blobClient);
    +                           ClientUtils.setUserJarBlobKeys(jarBlobKeys, 
jobGraph);
    +
    +                           Collection<Tuple2<String, PermanentBlobKey>> 
artifactBlobKeys = ClientUtils.uploadUserArtifacts(jobGraph.getJobID(), 
temporaryHack, blobClient);
    +                           ClientUtils.setUserArtifactBlobKeys(jobGraph, 
artifactBlobKeys);
    +                   } catch (IOException e) {
    +                           throw new CompletionException(new 
RestHandlerException(
    +                                   "Could not upload job files.",
    +                                   
HttpResponseStatus.INTERNAL_SERVER_ERROR,
    +                                   e));
    +                   }
    +                   return jobGraph;
    +           });
    +
    +           CompletableFuture<Acknowledge> jobSubmissionFuture = 
finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, 
timeout));
    +
    +           return jobSubmissionFuture.thenCombine(jobGraphFuture,
    +                   (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + 
jobGraph.getJobID()));
    +   }
    --- End diff --
    
    This method is quite long. We could think about moving the generation of 
the `jarFiles` and `temporaryHack` structures into individual methods.


> Parallelism of generated operators should have max parallism of input
> ---------------------------------------------------------------------
>
>                 Key: FLINK-9289
>                 URL: https://issues.apache.org/jira/browse/FLINK-9289
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Fabian Hueske
>            Assignee: Xingcan Cui
>            Priority: Major
>              Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction 
> mappers to their predecessor. This is done by assigning the same parallelism 
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be 
> chained anymore and the operator is generated with default parallelism. This 
> can lead to a {code}NoResourceAvailableException: Not enough free slots 
> available to run the job.{code} as reported by a user on the mailing list: 
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max 
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at 
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to