Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4688#discussion_r140226502
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
---
@@ -258,6 +260,23 @@ public void start() throws Exception {
8));
}
+ @Override
+ public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time
timeout) {
+ final int numberJobsRunning = jobManagerRunners.size();
+
+ ArrayList<CompletableFuture<JobDetails>> individualJobDetails =
new ArrayList<>(numberJobsRunning);
+
+ for (JobManagerRunner jobManagerRunner :
jobManagerRunners.values()) {
+
individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout));
+ }
+
+ CompletableFuture<Collection<JobDetails>> combinedJobDetails =
FutureUtils.combineAll(individualJobDetails);
+
+ return combinedJobDetails.thenApply(
+ (Collection<JobDetails> jobDetails) ->
+ new MultipleJobsDetails(jobDetails.toArray(new
JobDetails[numberJobsRunning]), null));
--- End diff --
`FutureUtils.combineAll` returns a `Arrays.ArrayList`. This type is
directly backed by the provided array. When calling `Arrays.ArrayList#toArray`,
this array will be copied. Thus, I can change `MultipleJobsDetails` to store
the individual `JobDetails` as a list. Then there should be no copy involved.
---