Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r197450435
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
---
@@ -54,18 +69,89 @@ public JobSubmitHandler(
@Override
protected CompletableFuture<JobSubmitResponseBody>
handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody,
EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws
RestHandlerException {
- JobGraph jobGraph;
- try {
- ObjectInputStream objectIn = new ObjectInputStream(new
ByteArrayInputStream(request.getRequestBody().serializedJobGraph));
- jobGraph = (JobGraph) objectIn.readObject();
- } catch (Exception e) {
- throw new RestHandlerException(
- "Failed to deserialize JobGraph.",
- HttpResponseStatus.BAD_REQUEST,
- e);
+ Collection<Path> uploadedFiles = request.getUploadedFiles();
+ Map<String, Path> nameToFile =
uploadedFiles.stream().collect(Collectors.toMap(
+ path -> path.getFileName().toString(),
+ entry -> entry
+ ));
+
+ JobSubmitRequestBody requestBody = request.getRequestBody();
+
+ Path jobGraphFile =
getPathAndAssertUpload(requestBody.jobGraphFileName, "JobGraph", nameToFile);
+
+ Collection<org.apache.flink.core.fs.Path> jarFiles = new
ArrayList<>(requestBody.jarFileNames.size());
+ for (String jarFileName : requestBody.jarFileNames) {
+ Path jarFile = getPathAndAssertUpload(jarFileName,
"Jar", nameToFile);
--- End diff --
introduce a constant for `"Jar"`?
---