Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4688#discussion_r140226502
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
    @@ -258,6 +260,23 @@ public void start() throws Exception {
                                8));
        }
     
    +   @Override
    +   public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time 
timeout) {
    +           final int numberJobsRunning = jobManagerRunners.size();
    +
    +           ArrayList<CompletableFuture<JobDetails>> individualJobDetails = 
new ArrayList<>(numberJobsRunning);
    +
    +           for (JobManagerRunner jobManagerRunner : 
jobManagerRunners.values()) {
    +                   
individualJobDetails.add(jobManagerRunner.getJobManagerGateway().requestJobDetails(timeout));
    +           }
    +
    +           CompletableFuture<Collection<JobDetails>> combinedJobDetails = 
FutureUtils.combineAll(individualJobDetails);
    +
    +           return combinedJobDetails.thenApply(
    +                   (Collection<JobDetails> jobDetails) ->
    +                           new MultipleJobsDetails(jobDetails.toArray(new 
JobDetails[numberJobsRunning]), null));
    --- End diff --
    
    `FutureUtils.combineAll` returns a `Arrays.ArrayList`. This type is 
directly backed by the provided array. When calling `Arrays.ArrayList#toArray`, 
this array will be copied. Thus, I can change `MultipleJobsDetails` to store 
the individual `JobDetails` as a list. Then there should be no copy involved.


---

Reply via email to