Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r199182588
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -315,36 +315,58 @@ public JobSubmissionResult submitJob(JobGraph
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
- log.info("Requesting blob server port.");
- CompletableFuture<BlobServerPortResponseBody> portFuture =
sendRequest(BlobServerPortHeaders.getInstance());
+ CompletableFuture<JobSubmitResponseBody> submissionFuture =
CompletableFuture.supplyAsync(
+ () -> {
+ log.info("Submitting job graph.");
- CompletableFuture<JobGraph> jobUploadFuture =
portFuture.thenCombine(
- getDispatcherAddress(),
- (BlobServerPortResponseBody response, String
dispatcherAddress) -> {
- final int blobServerPort = response.port;
- final InetSocketAddress address = new
InetSocketAddress(dispatcherAddress, blobServerPort);
+ List<String> jarFileNames = new ArrayList<>(8);
+ List<JobSubmitRequestBody.DistributedCacheFile>
artifactFileNames = new ArrayList<>(8);
+ Collection<FileUpload> filesToUpload = new
ArrayList<>(8);
+ // TODO: need configurable location
+ final java.nio.file.Path jobGraphFile;
try {
-
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address,
flinkConfig));
- } catch (Exception e) {
- throw new CompletionException(e);
+ jobGraphFile =
Files.createTempFile("flink-jobgraph", ".bin");
+ try (OutputStream fileOut =
Files.newOutputStream(jobGraphFile)) {
+ try (ObjectOutputStream
objectOut = new ObjectOutputStream(fileOut)) {
+
objectOut.writeObject(jobGraph);
+ }
+ }
+ filesToUpload.add(new
FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
+ } catch (IOException e) {
+ throw new CompletionException("Failed
to serialize JobGraph.", e);
}
- return jobGraph;
- });
-
- CompletableFuture<JobSubmitResponseBody> submissionFuture =
jobUploadFuture.thenCompose(
- (JobGraph jobGraphToSubmit) -> {
- log.info("Submitting job graph.");
+ for (Path jar : jobGraph.getUserJars()) {
+ jarFileNames.add(jar.getName());
+ filesToUpload.add(new
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+ }
- try {
- return sendRequest(
- JobSubmitHeaders.getInstance(),
- new
JobSubmitRequestBody(jobGraph));
- } catch (IOException ioe) {
- throw new CompletionException(new
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+ for (Map.Entry<String,
DistributedCache.DistributedCacheEntry> artifacts :
jobGraph.getUserArtifacts().entrySet()) {
+ artifactFileNames.add(new
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new
Path(artifacts.getValue().filePath).getName()));
+ filesToUpload.add(new
FileUpload(Paths.get(artifacts.getValue().filePath),
RestConstants.CONTENT_TYPE_BINARY));
}
- });
+
+ final CompletableFuture<JobSubmitResponseBody>
submitFuture = sendRetriableRequest(
--- End diff --
Shall we split up the resource preparation and sending the actual request
into different steps? Then this `Supplier` would be much simpler:
```
CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>
requestFuture = CompletableFuture.supplyAsync(() -> ..., executorService);
CompletableFuture<JobSubmitResponseBody> submissionFuture =
requestFuture.thenCompose(requestBody -> sendRetriableRequest());
submissionFuture.whenComplete((ignoredA, ignoredB) -> cleanupFiles));
return submisssionFuture;
```
---