[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511232#comment-16511232
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195099673
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
---
@@ -91,19 +111,49 @@ protected void channelRead0(final
ChannelHandlerContext ctx, final HttpObject ms
while (httpContent !=
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
final InterfaceHttpData data =
currentHttpPostRequestDecoder.next();
- if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.FileUpload) {
- final DiskFileUpload fileUpload =
(DiskFileUpload) data;
- checkState(fileUpload.isCompleted());
-
- final Path dest =
uploadDir.resolve(Paths.get(UUID.randomUUID() +
- "_" +
fileUpload.getFilename()));
- fileUpload.renameTo(dest.toFile());
-
ctx.channel().attr(UPLOADED_FILE).set(dest);
+ if
(currentHttpRequest.getUri().equals(JobSubmitHeaders.getInstance().getTargetRestEndpointURL()))
{
+ if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.FileUpload) {
+ final DiskFileUpload fileUpload
= (DiskFileUpload) data;
+
checkState(fileUpload.isCompleted());
+ LOG.trace("Received job-submit
file upload. attribute:{} fileName:{}.", fileUpload.getName(),
fileUpload.getFilename());
+
+ Path dest;
+ if
(data.getName().startsWith(HTTP_ATTRIBUTE_JARS)) {
+ dest =
currentJobSubmitRequestBuffer.getJarDir().resolve(fileUpload.getFilename());
+
fileUpload.renameTo(dest.toFile());
+
currentJobSubmitRequestBuffer.addJar(fileUpload.getFile().toPath());
+ } else if
(data.getName().startsWith(HTTP_ATTRIBUTE_ARTIFACTS)) {
+ dest =
currentJobSubmitRequestBuffer.getArtifactDir().resolve(fileUpload.getFilename());
+
fileUpload.renameTo(dest.toFile());
+
currentJobSubmitRequestBuffer.addUserArtifact(fileUpload.getFile().toPath());
+ } else {
+ LOG.warn("Received
unexpected FileUpload that will be ignored. attribute:{} fileName:{}.",
data.getName(), fileUpload.getFilename());
+ fileUpload.delete();
+ }
+ } else if (data.getHttpDataType() ==
InterfaceHttpData.HttpDataType.Attribute) {
+ final Attribute request =
(Attribute) data;
+ final byte[] requestJson =
request.get();
+ JobSubmitRequestBody
jobSubmitRequestBody =
RestMapperUtils.getStrictObjectMapper().readValue(requestJson,
JobSubmitHeaders.getInstance().getRequestClass());
+
currentJobSubmitRequestBuffer.setJobGraph(jobSubmitRequestBody.serializedJobGraph);
+ }
--- End diff --
I think we are mixing here a lot of handler specific knowledge into this
handler and thereby creating a very strong coupling between multiple
components. Moreover, this handler seems to deserialize json which is rather
the responsibility of the `AbstractHandler`.
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)