Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195089986
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
---
@@ -28,35 +33,51 @@
import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.util.ScalaUtils;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import akka.actor.AddressFromURIString;
+import org.slf4j.Logger;
+
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
/**
* This handler can be used to submit jobs to a Flink cluster.
*/
public final class JobSubmitHandler extends
AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody,
JobSubmitResponseBody, EmptyMessageParameters> {
+ private final Configuration config;
+
public JobSubmitHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<? extends DispatcherGateway>
leaderRetriever,
Time timeout,
- Map<String, String> headers) {
+ Map<String, String> headers,
+ Configuration config) {
super(localRestAddress, leaderRetriever, timeout, headers,
JobSubmitHeaders.getInstance());
+ this.config = config;
}
@Override
protected CompletableFuture<JobSubmitResponseBody>
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody,
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws
RestHandlerException {
+ final JobSubmitRequestBody requestBody =
request.getRequestBody();
JobGraph jobGraph;
- try {
- ObjectInputStream objectIn = new ObjectInputStream(new
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
+ try (ObjectInputStream objectIn = new ObjectInputStream(new
ByteArrayInputStream(requestBody.serializedJobGraph))) {
--- End diff --
Should we send the `serializedJobGraph` also as part of the post request
body instead of passing it through jackson to encode it in base64.
---