zentol commented on a change in pull request #12932:
URL: https://github.com/apache/flink/pull/12932#discussion_r458056398



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -439,10 +453,30 @@ public RetryException(Throwable cause) {
                long timeout,
                TimeUnit timeUnit,
                Executor timeoutFailExecutor) {
+               return orTimeout(future, timeout, timeUnit, 
timeoutFailExecutor, null);
+       }
+
+       /**
+        * Times the given future out after the timeout.
+        *
+        * @param future to time out
+        * @param timeout after which the given future is timed out
+        * @param timeUnit time unit of the timeout
+        * @param timeoutFailExecutor executor that will complete the future 
exceptionally after the timeout is reached
+        * @param timeoutMsg timeout message for exception
+        * @param <T> type of the given future
+        * @return The timeout enriched future
+        */
+       public static <T> CompletableFuture<T> orTimeout(
+               CompletableFuture<T> future,
+               long timeout,
+               TimeUnit timeUnit,
+               Executor timeoutFailExecutor,
+               String timeoutMsg) {

Review comment:
       missing `@Nullable`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -1026,14 +1060,16 @@ public void onComplete(Throwable failure, U success) {
        private static final class Timeout implements Runnable {
 
                private final CompletableFuture<?> future;
+               private final String timeoutMsg;
 
-               private Timeout(CompletableFuture<?> future) {
+               private Timeout(CompletableFuture<?> future, @Nullable String 
timeoutMsg) {
                        this.future = checkNotNull(future);
+                       this.timeoutMsg = timeoutMsg;
                }
 
                @Override
                public void run() {
-                       future.completeExceptionally(new TimeoutException());
+                       future.completeExceptionally(null == timeoutMsg ? new 
TimeoutException() : new TimeoutException(timeoutMsg));

Review comment:
       this distinction isn't necessary; `new TimeoutException(timeoutMsg)` 
will work in any case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to