[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511121#comment-16511121
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195077673
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -197,6 +208,88 @@ public void shutdown(Time timeout) {
executor);
}
+ public <M extends MessageHeaders<R, P, U>, U extends MessageParameters,
R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(
+ String targetAddress,
+ int targetPort,
+ M messageHeaders,
+ U messageParameters,
+ R request,
+ Collection<Path> jars,
+ Collection<Path> userArtifacts) throws IOException {
+ Preconditions.checkNotNull(targetAddress);
+ Preconditions.checkArgument(0 <= targetPort && targetPort <
65536, "The target port " + targetPort + " is not in the range (0, 65536].");
+ Preconditions.checkNotNull(messageHeaders);
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(messageParameters);
+ Preconditions.checkState(messageParameters.isResolved(),
"Message parameters were not resolved.");
+
+ String targetUrl =
MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(),
messageParameters);
+
+ LOG.debug("Sending request of class {} to {}:{}{}",
request.getClass(), targetAddress, targetPort, targetUrl);
+ // serialize payload
+ StringWriter sw = new StringWriter();
+ objectMapper.writeValue(sw, request);
+ ByteBuf payload =
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+ // do not load file into memory, this can have weird
side-effects and break functionality
+ HttpDataFactory factory = new DefaultHttpDataFactory(true);
+
+ HttpRequest httpRequest = new
DefaultHttpRequest(HttpVersion.HTTP_1_1,
messageHeaders.getHttpMethod().getNettyHttpMethod(),
messageHeaders.getTargetRestEndpointURL());
+ httpRequest.headers()
+ .set(HttpHeaders.Names.HOST, targetAddress + ':' +
targetPort)
+ .set(HttpHeaders.Names.CONNECTION,
HttpHeaders.Values.CLOSE);
+
+ // takes care of splitting the request into multiple parts
+ HttpPostRequestEncoder bodyRequestEncoder;
+ try {
+ bodyRequestEncoder = new
HttpPostRequestEncoder(factory, httpRequest, true);
+
+ Attribute requestAttribute = new
MemoryAttribute(FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+ requestAttribute.setContent(payload);
+ bodyRequestEncoder.addBodyHttpData(requestAttribute);
+
+ addPathsToEncoder(jars,
FileUploadHandler.HTTP_ATTRIBUTE_JARS, RestConstants.JAR_CONTENT_TYPE,
bodyRequestEncoder);
+ addPathsToEncoder(userArtifacts,
FileUploadHandler.HTTP_ATTRIBUTE_ARTIFACTS, RestConstants.BINARY_CONTENT_TYPE,
bodyRequestEncoder);
--- End diff --
If we can send arbitrary files to the server and let the respective handler
make sense of what is in what file, then we would also not need to introduce
the different attributes.
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)