Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4688#discussion_r140226502 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -258,6 +260,23 @@ public void start() throws Exception { 8)); } + @Override + public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) { + final int numberJobsRunning = jobManagerRunners.size(); + + ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning); + + for (JobManagerRunner jobManagerRunner : jobManagerRunners.values()) { + individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout)); + } + + CompletableFuture<Collection<JobDetails>> combinedJobDetails = FutureUtils.combineAll(individualJobDetails); + + return combinedJobDetails.thenApply( + (Collection<JobDetails> jobDetails) -> + new MultipleJobsDetails(jobDetails.toArray(new JobDetails[numberJobsRunning]), null)); --- End diff -- `FutureUtils.combineAll` returns a `Arrays.ArrayList`. This type is directly backed by the provided array. When calling `Arrays.ArrayList#toArray`, this array will be copied. Thus, I can change `MultipleJobsDetails` to store the individual `JobDetails` as a list. Then there should be no copy involved.
---