[ 
https://issues.apache.org/jira/browse/FLINK-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107754#comment-16107754
 ] 

ASF GitHub Bot commented on FLINK-7317:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4433#discussion_r130400792
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ---
    @@ -50,42 +49,38 @@
         * @param <T> type of the result
         * @return Future containing either the result of the operation or a 
{@link RetryException}
         */
    -   public static <T> Future<T> retry(
    -           final Callable<Future<T>> operation,
    +   public static <T> java.util.concurrent.CompletableFuture<T> retry(
    +           final Callable<java.util.concurrent.CompletableFuture<T>> 
operation,
                final int retries,
                final Executor executor) {
     
    -           Future<T> operationResultFuture;
    +           java.util.concurrent.CompletableFuture<T> operationResultFuture;
     
                try {
                        operationResultFuture = operation.call();
                } catch (Exception e) {
    -                   return FlinkCompletableFuture.completedExceptionally(
    -                           new RetryException("Could not execute the 
provided operation.", e));
    +                   java.util.concurrent.CompletableFuture<T> 
exceptionResult = new java.util.concurrent.CompletableFuture<>();
    +                   exceptionResult.completeExceptionally(new 
RetryException("Could not execute the provided operation.", e));
    +                   return exceptionResult;
                }
     
    -           return operationResultFuture.handleAsync(new BiFunction<T, 
Throwable, Future<T>>() {
    -                   @Override
    -                   public Future<T> apply(T t, Throwable throwable) {
    +           return operationResultFuture.handleAsync(
    +                   (t, throwable) -> {
                                if (throwable != null) {
                                        if (retries > 0) {
                                                return retry(operation, retries 
- 1, executor);
                                        } else {
    -                                           return 
FlinkCompletableFuture.completedExceptionally(
    -                                                   new 
RetryException("Could not complete the operation. Number of retries " +
    -                                                           "has been 
exhausted.", throwable));
    +                                           
java.util.concurrent.CompletableFuture<T> exceptionResult = new 
java.util.concurrent.CompletableFuture<>();
    +                                           
exceptionResult.completeExceptionally(new RetryException("Could not complete 
the operation. Number of retries " +
    +                                                   "has been exhausted.", 
throwable));
    +                                           return exceptionResult;
                                        }
                                } else {
    -                                   return 
FlinkCompletableFuture.completed(t);
    +                                   return 
java.util.concurrent.CompletableFuture.completedFuture(t);
                                }
    -                   }
    -           }, executor)
    -           .thenCompose(new ApplyFunction<Future<T>, Future<T>>() {
    -                   @Override
    -                   public Future<T> apply(Future<T> value) {
    -                           return value;
    -                   }
    -           });
    +                   },
    +                   executor)
    +           .thenCompose(value -> value);
    --- End diff --
    
    what is this for?


> Remove Flink's futures from ExecutionGraph
> ------------------------------------------
>
>                 Key: FLINK-7317
>                 URL: https://issues.apache.org/jira/browse/FLINK-7317
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to