dawidwys commented on pull request #17440:
URL: https://github.com/apache/flink/pull/17440#issuecomment-943278706


   So I spent some time looking into the `waitForChannelActivation` thingy. I 
am a bit concern if the sync waiting does not prolong the startup too much. If 
I understand the code correctly, we will sequentially wait for each channel to 
become active before requesting a partition on another channel. 
   
   As the initial error shows it may take some time. I was rather thinking if 
we could return the future itself (I know we would've to pass it through a 
couple of layers). Then we could start buffer debloating once a future for all 
channels activation completes.
   Something like:
   
   ```
           CompletableFuture.allOf(
                           Arrays.stream(environment.getAllInputGates())
                                   
.map(InputGate::getAllChannelsActivatedFuture)
                                   .toArray(CompletableFuture[]::new))
                   .whenComplete(
                           (r, ex) -> {
                               scheduleBufferDebloater();
                           });
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to