This is an automated email from the ASF dual-hosted git repository.

wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a81ffa6d019 [FLINK-27144][runtime] Provide timeout details when 
calling FutureUtils.orTimeout
a81ffa6d019 is described below

commit a81ffa6d019d9891bd3a54f50fb36ad847721daa
Author: Lijie Wang <[email protected]>
AuthorDate: Thu Apr 6 14:29:35 2023 +0800

    [FLINK-27144][runtime] Provide timeout details when calling 
FutureUtils.orTimeout
    
    This closes #22367
---
 .../client/program/rest/RestClusterClient.java     |  5 +++-
 .../apache/flink/util/concurrent/FutureUtils.java  | 33 ----------------------
 .../flink/util/concurrent/FutureUtilsTest.java     |  3 +-
 .../slotpool/DeclarativeSlotPoolBridge.java        |  5 +++-
 .../handler/async/CompletedOperationCache.java     |  5 +++-
 .../scheduler/DefaultExecutionDeployer.java        |  6 +++-
 .../runtime/taskexecutor/TaskManagerRunner.java    |  7 ++++-
 .../OperatorEventSendingCheckpointITCase.java      |  7 ++++-
 8 files changed, 31 insertions(+), 40 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index c81619a97bf..8f65777cfec 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -1049,7 +1049,10 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
         return FutureUtils.orTimeout(
                         webMonitorLeaderRetriever.getLeaderFuture(),
                         restClusterClientConfiguration.getAwaitLeaderTimeout(),
-                        TimeUnit.MILLISECONDS)
+                        TimeUnit.MILLISECONDS,
+                        String.format(
+                                "Waiting for leader address of 
WebMonitorEndpoint timed out after %d ms.",
+                                
restClusterClientConfiguration.getAwaitLeaderTimeout()))
                 .thenApplyAsync(
                         leaderAddressSessionId -> {
                             final String url = leaderAddressSessionId.f0;
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
index 7a017651f61..2ab034e023f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
@@ -413,20 +413,6 @@ public class FutureUtils {
         }
     }
 
-    /**
-     * Times the given future out after the timeout.
-     *
-     * @param future to time out
-     * @param timeout after which the given future is timed out
-     * @param timeUnit time unit of the timeout
-     * @param <T> type of the given future
-     * @return The timeout enriched future
-     */
-    public static <T> CompletableFuture<T> orTimeout(
-            CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
-        return orTimeout(future, timeout, timeUnit, 
Executors.directExecutor(), null);
-    }
-
     /**
      * Times the given future out after the timeout.
      *
@@ -445,25 +431,6 @@ public class FutureUtils {
         return orTimeout(future, timeout, timeUnit, 
Executors.directExecutor(), timeoutMsg);
     }
 
-    /**
-     * Times the given future out after the timeout.
-     *
-     * @param future to time out
-     * @param timeout after which the given future is timed out
-     * @param timeUnit time unit of the timeout
-     * @param timeoutFailExecutor executor that will complete the future 
exceptionally after the
-     *     timeout is reached
-     * @param <T> type of the given future
-     * @return The timeout enriched future
-     */
-    public static <T> CompletableFuture<T> orTimeout(
-            CompletableFuture<T> future,
-            long timeout,
-            TimeUnit timeUnit,
-            Executor timeoutFailExecutor) {
-        return orTimeout(future, timeout, timeUnit, timeoutFailExecutor, null);
-    }
-
     /**
      * Times the given future out after the timeout.
      *
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
index a9e0e47f9f0..2f0cd8c253a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
@@ -294,12 +294,13 @@ public class FutureUtilsTest extends TestLogger {
         final CompletableFuture<String> future = new CompletableFuture<>();
         final long timeout = 10L;
 
-        FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS);
+        FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS, 
"testOrTimeout");
 
         try {
             future.get();
         } catch (ExecutionException e) {
             assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
+            ExceptionUtils.assertThrowableWithMessage(e, "testOrTimeout");
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index c7ae507ad6c..3cb0d04ed19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -325,7 +325,10 @@ public class DeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolService implem
                             pendingRequest.getSlotFuture(),
                             timeout.toMilliseconds(),
                             TimeUnit.MILLISECONDS,
-                            componentMainThreadExecutor)
+                            componentMainThreadExecutor,
+                            String.format(
+                                    "Pending slot request %s timed out after 
%d ms.",
+                                    pendingRequest.getSlotRequestId(), 
timeout.toMilliseconds()))
                     .whenComplete(
                             (physicalSlot, throwable) -> {
                                 if (throwable instanceof TimeoutException) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
index 3bd34d6602b..bbcc312024a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -187,7 +187,10 @@ public class CompletedOperationCache<K extends 
OperationKey, R extends Serializa
                         FutureUtils.orTimeout(
                                 asyncWaitForResultsToBeAccessed(),
                                 cacheDuration.getSeconds(),
-                                TimeUnit.SECONDS);
+                                TimeUnit.SECONDS,
+                                String.format(
+                                        "Waiting for results to be accessed 
timed out after %s seconds.",
+                                        cacheDuration.getSeconds()));
             }
 
             return terminationFuture;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
index a12d0cba06c..99b3bcd04e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
@@ -280,7 +280,11 @@ public class DefaultExecutionDeployer implements 
ExecutionDeployer {
                         partitionRegistrationFuture,
                         partitionRegistrationTimeout.toMilliseconds(),
                         TimeUnit.MILLISECONDS,
-                        mainThreadExecutor);
+                        mainThreadExecutor,
+                        String.format(
+                                "Registering produced partitions for execution 
%s timed out after %d ms.",
+                                execution.getAttemptId(),
+                                
partitionRegistrationTimeout.toMilliseconds()));
             } else {
                 return FutureUtils.completedVoidFuture();
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0bbd2d9ee6a..258c5ca9be2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -439,7 +439,12 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
             closeAsync(Result.FAILURE);
 
             FutureUtils.orTimeout(
-                    terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+                    terminationFuture,
+                    FATAL_ERROR_SHUTDOWN_TIMEOUT_MS,
+                    TimeUnit.MILLISECONDS,
+                    String.format(
+                            "Waiting for TaskManager shutting down timed out 
after %s ms.",
+                            FATAL_ERROR_SHUTDOWN_TIMEOUT_MS));
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 67dcffa8cb2..a1da741ce04 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -259,7 +259,12 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
 
     private static CompletableFuture<Acknowledge> askTimeoutFuture() {
         final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
-        FutureUtils.orTimeout(future, 500, TimeUnit.MILLISECONDS);
+        final long timeout = 500;
+        FutureUtils.orTimeout(
+                future,
+                timeout,
+                TimeUnit.MILLISECONDS,
+                String.format("Future timed out after %s ms.", timeout));
         return future;
     }
 

Reply via email to