tillrohrmann commented on a change in pull request #7568: [FLINK-11417] Make 
access to ExecutionGraph single threaded from JobMaster main thread
URL: https://github.com/apache/flink/pull/7568#discussion_r250612669
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ##########
 @@ -802,6 +804,89 @@ public void onComplete(Throwable failure, U success) {
                return result;
        }
 
+       /**
+        * This function takes a {@link CompletableFuture} and a function to 
apply to this future. If the input future
+        * is already done, this function returns {@link 
CompletableFuture#thenApply(Function)}. Otherwise, the return
+        * value is {@link CompletableFuture#thenApplyAsync(Function, 
Executor)} with the given executor.
+        *
+        * @param completableFuture the completable future for which we want to 
apply.
+        * @param executor the executor to run the apply function if the future 
is not yet done.
+        * @param applyFun the function to apply.
+        * @param <IN> type of the input future.
+        * @param <OUT> type of the output future.
+        * @return a completable future that is applying the given function to 
the input future.
+        */
+       public static <IN, OUT> CompletableFuture<OUT> applyAsyncIfNotDone(
+               CompletableFuture<IN> completableFuture,
+               Executor executor,
+               Function<? super IN, ? extends OUT> applyFun) {
+               return completableFuture.isDone() ?
+                       completableFuture.thenApply(applyFun) :
+                       completableFuture.thenApplyAsync(applyFun, executor);
+       }
+
+       /**
+        * This function takes a {@link CompletableFuture} and a function to 
compose with this future. If the input future
+        * is already done, this function returns {@link 
CompletableFuture#thenCompose(Function)}. Otherwise, the return
+        * value is {@link CompletableFuture#thenComposeAsync(Function, 
Executor)} with the given executor.
+        *
+        * @param completableFuture the completable future for which we want to 
compose.
+        * @param executor the executor to run the compose function if the 
future is not yet done.
+        * @param composeFun the function to compose.
+        * @param <IN> type of the input future.
+        * @param <OUT> type of the output future.
+        * @return a completable future that is a composition of the input 
future and the function.
+        */
+       public static <IN, OUT> CompletableFuture<OUT> composeAsyncIfNotDone(
+               CompletableFuture<IN> completableFuture,
+               Executor executor,
+               Function<? super IN, ? extends CompletionStage<OUT>> 
composeFun) {
+               return completableFuture.isDone() ?
+                       completableFuture.thenCompose(composeFun) :
+                       completableFuture.thenComposeAsync(composeFun, 
executor);
+       }
+
+       /**
+        * This function takes a {@link CompletableFuture} and a bi-consumer to 
call on completion of this future. If the
+        * input future is already done, this function returns {@link 
CompletableFuture#whenComplete(BiConsumer)}.
+        * Otherwise, the return value is {@link 
CompletableFuture#whenCompleteAsync(BiConsumer, Executor)} with the given
+        * executor.
+        *
+        * @param completableFuture the completable future for which we want to 
call #whenComplete.
+        * @param executor the executor to run the whenComplete function if the 
future is not yet done.
+        * @param whenCompleteFun the bi-consumer function to call when the 
future is completed.
+        * @param <IN> type of the input future.
+        * @return the new completion stage.
+        */
+       public static <IN> CompletableFuture<IN> whenCompleteAsyncIfNotDone(
+               CompletableFuture<IN> completableFuture,
+               Executor executor,
+               BiConsumer<? super IN, ? super Throwable> whenCompleteFun) {
+               return completableFuture.isDone() ?
+                       completableFuture.whenComplete(whenCompleteFun) :
+                       completableFuture.whenCompleteAsync(whenCompleteFun, 
executor);
+       }
+
+       /**
+        * This function takes a {@link CompletableFuture} and a consumer to 
accept the result of this future. If the input
+        * future is already done, this function returns {@link 
CompletableFuture#thenAccept(Consumer)}. Otherwise, the
+        * return value is {@link CompletableFuture#thenAcceptAsync(Consumer, 
Executor)} with the given executor.
+        *
+        * @param completableFuture the completable future for which we want to 
call #thenAccept.
+        * @param executor the executor to run the thenAccept function if the 
future is not yet done.
+        * @param consumer the consumer function to call when the future is 
completed.
+        * @param <IN> type of the input future.
+        * @return the new completion stage.
+        */
+       public static <IN> CompletableFuture<Void> thenAcceptAsyncIfNotDone(
 
 Review comment:
   Naming not consistent with other functions (here thenX and the others X)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to