[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531618#comment-16531618
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r199867852
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -315,36 +315,61 @@ public JobSubmissionResult submitJob(JobGraph
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
- log.info("Requesting blob server port.");
- CompletableFuture<BlobServerPortResponseBody> portFuture =
sendRequest(BlobServerPortHeaders.getInstance());
+ CompletableFuture<java.nio.file.Path> jobGraphFileFuture =
CompletableFuture.supplyAsync(() -> {
+ try {
+ final java.nio.file.Path jobGraphFile =
Files.createTempFile("flink-jobgraph", ".bin");
+ try (ObjectOutputStream objectOut = new
ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
+ objectOut.writeObject(jobGraph);
+ }
+ return jobGraphFile;
+ } catch (IOException e) {
+ throw new CompletionException(new
FlinkException("Failed to serialize JobGraph.", e));
+ }
+ }, executorService);
- CompletableFuture<JobGraph> jobUploadFuture =
portFuture.thenCombine(
- getDispatcherAddress(),
- (BlobServerPortResponseBody response, String
dispatcherAddress) -> {
- final int blobServerPort = response.port;
- final InetSocketAddress address = new
InetSocketAddress(dispatcherAddress, blobServerPort);
+ CompletableFuture<Tuple2<JobSubmitRequestBody,
Collection<FileUpload>>> requestFuture =
jobGraphFileFuture.thenApply(jobGraphFile -> {
+ List<String> jarFileNames = new ArrayList<>(8);
+ List<JobSubmitRequestBody.DistributedCacheFile>
artifactFileNames = new ArrayList<>(8);
+ Collection<FileUpload> filesToUpload = new
ArrayList<>(8);
- try {
-
ClientUtils.uploadJobGraphFiles(jobGraph, () -> new BlobClient(address,
flinkConfig));
- } catch (Exception e) {
- throw new CompletionException(e);
- }
+ filesToUpload.add(new FileUpload(jobGraphFile,
RestConstants.CONTENT_TYPE_BINARY));
- return jobGraph;
- });
+ for (Path jar : jobGraph.getUserJars()) {
+ jarFileNames.add(jar.getName());
+ filesToUpload.add(new
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+ }
- CompletableFuture<JobSubmitResponseBody> submissionFuture =
jobUploadFuture.thenCompose(
- (JobGraph jobGraphToSubmit) -> {
- log.info("Submitting job graph.");
+ for (Map.Entry<String,
DistributedCache.DistributedCacheEntry> artifacts :
jobGraph.getUserArtifacts().entrySet()) {
+ artifactFileNames.add(new
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new
Path(artifacts.getValue().filePath).getName()));
+ filesToUpload.add(new
FileUpload(Paths.get(artifacts.getValue().filePath),
RestConstants.CONTENT_TYPE_BINARY));
+ }
- try {
- return sendRequest(
- JobSubmitHeaders.getInstance(),
- new
JobSubmitRequestBody(jobGraph));
- } catch (IOException ioe) {
- throw new CompletionException(new
FlinkException("Could not create JobSubmitRequestBody.", ioe));
- }
- });
+ final JobSubmitRequestBody requestBody = new
JobSubmitRequestBody(
+ jobGraphFile.getFileName().toString(),
+ jarFileNames,
+ artifactFileNames);
+
+ return Tuple2.of(requestBody,
Collections.unmodifiableCollection(filesToUpload));
+ });
+
+ final CompletableFuture<JobSubmitResponseBody> submissionFuture
= requestFuture.thenCompose(
+ requestAndFileUploads -> sendRetriableRequest(
+ JobSubmitHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ requestAndFileUploads.f0,
+ requestAndFileUploads.f1,
+ isConnectionProblemOrServiceUnavailable())
+ );
+
+ submissionFuture
+ .thenCombine(jobGraphFileFuture, (ignored,
jobGraphFile) -> jobGraphFile)
+ .thenAccept(jobGraphFile -> {
--- End diff --
ah right, i did it like this so I don't have to return anything in
thenCombine(). I could of course return `null`, but that doesn't seem right,
any returning anything else isn't necessary.
@tillrohrmann What do you think?
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)