[
https://issues.apache.org/jira/browse/FLINK-9289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520382#comment-16520382
]
ASF GitHub Bot commented on FLINK-9289:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6203#discussion_r197452051
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -317,43 +315,51 @@ public JobSubmissionResult submitJob(JobGraph
jobGraph, ClassLoader classLoader)
// we have to enable queued scheduling because slot will be
allocated lazily
jobGraph.setAllowQueuedScheduling(true);
- log.info("Requesting blob server port.");
- CompletableFuture<BlobServerPortResponseBody> portFuture =
sendRequest(BlobServerPortHeaders.getInstance());
-
- CompletableFuture<JobGraph> jobUploadFuture =
portFuture.thenCombine(
- getDispatcherAddress(),
- (BlobServerPortResponseBody response, String
dispatcherAddress) -> {
- final int blobServerPort = response.port;
- final InetSocketAddress address = new
InetSocketAddress(dispatcherAddress, blobServerPort);
+ CompletableFuture<JobSubmitResponseBody> submissionFuture =
CompletableFuture.supplyAsync(
+ () -> {
+ log.info("Submitting job graph.");
- List<Path> userJars = jobGraph.getUserJars();
Map<String,
DistributedCache.DistributedCacheEntry> userArtifacts =
jobGraph.getUserArtifacts();
- if (!userJars.isEmpty() ||
!userArtifacts.isEmpty()) {
- try (BlobClient client = new
BlobClient(address, flinkConfig)) {
- log.info("Uploading jar
files.");
-
ClientUtils.uploadAndSetUserJars(jobGraph, client);
- log.info("Uploading jar
artifacts.");
-
ClientUtils.uploadAndSetUserArtifacts(jobGraph, client);
- } catch (IOException ioe) {
- throw new
CompletionException(new FlinkException("Could not upload job files.", ioe));
+
+ List<String> jarFileNames = new ArrayList<>(8);
+ List<JobSubmitRequestBody.DistributedCacheFile>
artifactFileNames = new ArrayList<>(8);
+ Collection<FileUpload> filesToUpload = new
ArrayList<>(8);
+
+ // TODO: need configurable location
+ final String jobGraphFileName;
+ try {
+ final java.nio.file.Path tempFile =
Files.createTempFile("flink-jobgraph", ".bin");
+ try (OutputStream fileOut =
Files.newOutputStream(tempFile)) {
+ try (ObjectOutputStream
objectOut = new ObjectOutputStream(fileOut)) {
+
objectOut.writeObject(jobGraph);
+ }
}
+ filesToUpload.add(new
FileUpload(tempFile, RestConstants.CONTENT_TYPE_BINARY));
+ jobGraphFileName =
tempFile.getFileName().toString();
+ } catch (IOException e) {
+ throw new RuntimeException("lol", e);
}
- return jobGraph;
- });
-
- CompletableFuture<JobSubmitResponseBody> submissionFuture =
jobUploadFuture.thenCompose(
- (JobGraph jobGraphToSubmit) -> {
- log.info("Submitting job graph.");
+ for (Path jar : jobGraph.getUserJars()) {
+ jarFileNames.add(jar.getName());
+ filesToUpload.add(new
FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
+ }
- try {
- return sendRequest(
- JobSubmitHeaders.getInstance(),
- new
JobSubmitRequestBody(jobGraph));
- } catch (IOException ioe) {
- throw new CompletionException(new
FlinkException("Could not create JobSubmitRequestBody.", ioe));
+ for (Map.Entry<String,
DistributedCache.DistributedCacheEntry> artifacts :
jobGraph.getUserArtifacts().entrySet()) {
+ artifactFileNames.add(new
JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(),
artifacts.getValue().filePath));
+ filesToUpload.add(new
FileUpload(Paths.get(artifacts.getValue().filePath),
RestConstants.CONTENT_TYPE_BINARY));
}
- });
+
+ return sendRetriableRequest(
+ JobSubmitHeaders.getInstance(),
+ EmptyMessageParameters.getInstance(),
+ new JobSubmitRequestBody(
+ jobGraphFileName,
+ jarFileNames,
+ artifactFileNames),
+ filesToUpload,
+
isConnectionProblemOrServiceUnavailable());
--- End diff --
I think it would be better to clean up the generated `JobGraph` file after
we've sent the request.
> Parallelism of generated operators should have max parallism of input
> ---------------------------------------------------------------------
>
> Key: FLINK-9289
> URL: https://issues.apache.org/jira/browse/FLINK-9289
> Project: Flink
> Issue Type: Bug
> Components: DataSet API
> Affects Versions: 1.5.0, 1.4.2, 1.6.0
> Reporter: Fabian Hueske
> Assignee: Xingcan Cui
> Priority: Major
> Labels: pull-request-available
>
> The DataSet API aims to chain generated operators such as key extraction
> mappers to their predecessor. This is done by assigning the same parallelism
> as the input operator.
> If a generated operator has more than two inputs, the operator cannot be
> chained anymore and the operator is generated with default parallelism. This
> can lead to a {code}NoResourceAvailableException: Not enough free slots
> available to run the job.{code} as reported by a user on the mailing list:
> https://lists.apache.org/thread.html/60a8bffcce54717b6273bf3de0f43f1940fbb711590f4b90cd666c9a@%3Cuser.flink.apache.org%3E
> I suggest to set the parallelism of a generated operator to the max
> parallelism of all of its inputs to fix this problem.
> Until the problem is fixed, a workaround is to set the default parallelism at
> the {{ExecutionEnvironment}}:
> {code}
> ExecutionEnvironment env = ...
> env.setParallelism(2);
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)