[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511231#comment-16511231
 ] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6147#discussion_r195101806
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
    @@ -91,19 +111,49 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
     
                        while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
                                final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
    -                           if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
    -                                   final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
    -                                   checkState(fileUpload.isCompleted());
    -
    -                                   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
    -                                           "_" + 
fileUpload.getFilename()));
    -                                   fileUpload.renameTo(dest.toFile());
    -                                   
ctx.channel().attr(UPLOADED_FILE).set(dest);
    +                           if 
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
 {
    +                                   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
    +                                           final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
    +                                           
checkState(fileUpload.isCompleted());
    +                                           LOG.trace("Received job-submit 
file upload. attribute:{} fileName:{}.", fileUpload.getName(), 
fileUpload.getFilename());
    +
    +                                           Path dest;
    +                                           if 
(data.getName().startsWith(HTTP_ATTRIBUTE_JARS)) {
    +                                                   dest = 
currentJobSubmitRequestBuffer.getJarDir().resolve(fileUpload.getFilename());
    +                                                   
fileUpload.renameTo(dest.toFile());
    +                                                   
currentJobSubmitRequestBuffer.addJar(fileUpload.getFile().toPath());
    +                                           } else if 
(data.getName().startsWith(HTTP_ATTRIBUTE_ARTIFACTS)) {
    +                                                   dest = 
currentJobSubmitRequestBuffer.getArtifactDir().resolve(fileUpload.getFilename());
    +                                                   
fileUpload.renameTo(dest.toFile());
    +                                                   
currentJobSubmitRequestBuffer.addUserArtifact(fileUpload.getFile().toPath());
    +                                           } else {
    +                                                   LOG.warn("Received 
unexpected FileUpload that will be ignored. attribute:{} fileName:{}.", 
data.getName(), fileUpload.getFilename());
    +                                                   fileUpload.delete();
    +                                           }
    +                                   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
    +                                           final Attribute request = 
(Attribute) data;
    +                                           final byte[] requestJson = 
request.get();
    +                                           JobSubmitRequestBody 
jobSubmitRequestBody = 
RestMapperUtils.getStrictObjectMapper().readValue(requestJson, 
JobSubmitHeaders.getInstance().getRequestClass());
    +                                           
currentJobSubmitRequestBuffer.setJobGraph(jobSubmitRequestBody.serializedJobGraph);
    +                                   }
    +                           } else {
    +                                   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
    +                                           final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
    +                                           
checkState(fileUpload.isCompleted());
    +
    +                                           final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
    +                                                   "_" + 
fileUpload.getFilename()));
    +                                           
fileUpload.renameTo(dest.toFile());
    +                                           
ctx.channel().attr(UPLOADED_FILE).set(dest);
    +                                   }
                                }
                                data.release();
                        }
     
                        if (httpContent instanceof LastHttpContent) {
    +                           if (currentJobSubmitRequestBuffer != null) {
    +                                   
ctx.channel().attr(SUBMITTED_JOB).set(currentJobSubmitRequestBuffer.get());
    +                           }
    --- End diff --
    
    Instead of setting the `SUBMITTED_JOB` attribute I think it could be enough 
to set the set of uploaded files as an attribute and then send the json payload 
to the downstream handler (`AbstractHandler`). Then we would not need to 
construct the `JobSubmitRequestBodyBuffer` as an intermediate helper structure.


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to