[
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511115#comment-16511115
]
ASF GitHub Bot commented on FLINK-9280:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6147#discussion_r195067790
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -313,38 +305,32 @@ public JobSubmissionResult submitJob(JobGraph
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
- log.info("Requesting blob server port.");
- CompletableFuture<BlobServerPortResponseBody> portFuture =
sendRequest(BlobServerPortHeaders.getInstance());
-
- CompletableFuture<JobGraph> jobUploadFuture =
portFuture.thenCombine(
- getDispatcherAddress(),
- (BlobServerPortResponseBody response, String
dispatcherAddress) -> {
- final int blobServerPort = response.port;
- final InetSocketAddress address = new
InetSocketAddress(dispatcherAddress, blobServerPort);
- final List<PermanentBlobKey> keys;
- try {
- log.info("Uploading jar files.");
- keys = BlobClient.uploadFiles(address,
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
- jobGraph.uploadUserArtifacts(address,
flinkConfig);
- } catch (IOException ioe) {
- throw new CompletionException(new
FlinkException("Could not upload job files.", ioe));
- }
-
- for (PermanentBlobKey key : keys) {
- jobGraph.addUserJarBlobKey(key);
- }
-
- return jobGraph;
- });
-
- CompletableFuture<JobSubmitResponseBody> submissionFuture =
jobUploadFuture.thenCompose(
- (JobGraph jobGraphToSubmit) -> {
- log.info("Submitting job graph.");
-
+ CompletableFuture<JobSubmitResponseBody> submissionFuture =
getWebMonitorBaseUrl()
+ .thenCompose(webMonitorBaseUrl -> {
try {
- return sendRequest(
+ jobGraph.zipUserArtifacts();
+
+ Collection<Path> localUserArtifacts =
jobGraph.getUserArtifacts().values().stream()
+ .map(entry -> new
Path(entry.filePath))
+ .filter(path -> {
+ try {
+ return
!path.getFileSystem().isDistributedFS();
+ } catch (Exception e) {
+ log.warn("Could
not determine whether {} is a local file. The file may not be accessible via
the Distributed Cache.", path, e);
+ // filesystem
isn't accessible from the client or FS class not present
+ return false;
+ }
+ })
+ .collect(Collectors.toList());
+
+ return restClient.sendRequest(
--- End diff --
Let's add retries for this call by adding a `sendRetriableRequest(...)`
with correct signature. Even better would be to add a `sendRequest(...)` with
the correct signature which dispatches to `sendRetriableRequest`.
> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
> Issue Type: New Feature
> Components: Job-Submission, REST
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob
> server, sets the blob keys in the jobgraph, and then uploads this graph to
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an
> optional list of jar files, that were previously uploaded through the
> {{JarUploadHandler}}. If present, the handler would upload these jars to the
> blobserver and set the blob keys.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)