This is an automated email from the ASF dual-hosted git repository.
wanglijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a81ffa6d019 [FLINK-27144][runtime] Provide timeout details when
calling FutureUtils.orTimeout
a81ffa6d019 is described below
commit a81ffa6d019d9891bd3a54f50fb36ad847721daa
Author: Lijie Wang <[email protected]>
AuthorDate: Thu Apr 6 14:29:35 2023 +0800
[FLINK-27144][runtime] Provide timeout details when calling
FutureUtils.orTimeout
This closes #22367
---
.../client/program/rest/RestClusterClient.java | 5 +++-
.../apache/flink/util/concurrent/FutureUtils.java | 33 ----------------------
.../flink/util/concurrent/FutureUtilsTest.java | 3 +-
.../slotpool/DeclarativeSlotPoolBridge.java | 5 +++-
.../handler/async/CompletedOperationCache.java | 5 +++-
.../scheduler/DefaultExecutionDeployer.java | 6 +++-
.../runtime/taskexecutor/TaskManagerRunner.java | 7 ++++-
.../OperatorEventSendingCheckpointITCase.java | 7 ++++-
8 files changed, 31 insertions(+), 40 deletions(-)
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index c81619a97bf..8f65777cfec 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -1049,7 +1049,10 @@ public class RestClusterClient<T> implements
ClusterClient<T> {
return FutureUtils.orTimeout(
webMonitorLeaderRetriever.getLeaderFuture(),
restClusterClientConfiguration.getAwaitLeaderTimeout(),
- TimeUnit.MILLISECONDS)
+ TimeUnit.MILLISECONDS,
+ String.format(
+ "Waiting for leader address of
WebMonitorEndpoint timed out after %d ms.",
+
restClusterClientConfiguration.getAwaitLeaderTimeout()))
.thenApplyAsync(
leaderAddressSessionId -> {
final String url = leaderAddressSessionId.f0;
diff --git
a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
index 7a017651f61..2ab034e023f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java
@@ -413,20 +413,6 @@ public class FutureUtils {
}
}
- /**
- * Times the given future out after the timeout.
- *
- * @param future to time out
- * @param timeout after which the given future is timed out
- * @param timeUnit time unit of the timeout
- * @param <T> type of the given future
- * @return The timeout enriched future
- */
- public static <T> CompletableFuture<T> orTimeout(
- CompletableFuture<T> future, long timeout, TimeUnit timeUnit) {
- return orTimeout(future, timeout, timeUnit,
Executors.directExecutor(), null);
- }
-
/**
* Times the given future out after the timeout.
*
@@ -445,25 +431,6 @@ public class FutureUtils {
return orTimeout(future, timeout, timeUnit,
Executors.directExecutor(), timeoutMsg);
}
- /**
- * Times the given future out after the timeout.
- *
- * @param future to time out
- * @param timeout after which the given future is timed out
- * @param timeUnit time unit of the timeout
- * @param timeoutFailExecutor executor that will complete the future
exceptionally after the
- * timeout is reached
- * @param <T> type of the given future
- * @return The timeout enriched future
- */
- public static <T> CompletableFuture<T> orTimeout(
- CompletableFuture<T> future,
- long timeout,
- TimeUnit timeUnit,
- Executor timeoutFailExecutor) {
- return orTimeout(future, timeout, timeUnit, timeoutFailExecutor, null);
- }
-
/**
* Times the given future out after the timeout.
*
diff --git
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
index a9e0e47f9f0..2f0cd8c253a 100644
---
a/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
+++
b/flink-core/src/test/java/org/apache/flink/util/concurrent/FutureUtilsTest.java
@@ -294,12 +294,13 @@ public class FutureUtilsTest extends TestLogger {
final CompletableFuture<String> future = new CompletableFuture<>();
final long timeout = 10L;
- FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS);
+ FutureUtils.orTimeout(future, timeout, TimeUnit.MILLISECONDS,
"testOrTimeout");
try {
future.get();
} catch (ExecutionException e) {
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof
TimeoutException);
+ ExceptionUtils.assertThrowableWithMessage(e, "testOrTimeout");
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
index c7ae507ad6c..3cb0d04ed19 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java
@@ -325,7 +325,10 @@ public class DeclarativeSlotPoolBridge extends
DeclarativeSlotPoolService implem
pendingRequest.getSlotFuture(),
timeout.toMilliseconds(),
TimeUnit.MILLISECONDS,
- componentMainThreadExecutor)
+ componentMainThreadExecutor,
+ String.format(
+ "Pending slot request %s timed out after
%d ms.",
+ pendingRequest.getSlotRequestId(),
timeout.toMilliseconds()))
.whenComplete(
(physicalSlot, throwable) -> {
if (throwable instanceof TimeoutException) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
index 3bd34d6602b..bbcc312024a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -187,7 +187,10 @@ public class CompletedOperationCache<K extends
OperationKey, R extends Serializa
FutureUtils.orTimeout(
asyncWaitForResultsToBeAccessed(),
cacheDuration.getSeconds(),
- TimeUnit.SECONDS);
+ TimeUnit.SECONDS,
+ String.format(
+ "Waiting for results to be accessed
timed out after %s seconds.",
+ cacheDuration.getSeconds()));
}
return terminationFuture;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
index a12d0cba06c..99b3bcd04e5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionDeployer.java
@@ -280,7 +280,11 @@ public class DefaultExecutionDeployer implements
ExecutionDeployer {
partitionRegistrationFuture,
partitionRegistrationTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS,
- mainThreadExecutor);
+ mainThreadExecutor,
+ String.format(
+ "Registering produced partitions for execution
%s timed out after %d ms.",
+ execution.getAttemptId(),
+
partitionRegistrationTimeout.toMilliseconds()));
} else {
return FutureUtils.completedVoidFuture();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0bbd2d9ee6a..258c5ca9be2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -439,7 +439,12 @@ public class TaskManagerRunner implements
FatalErrorHandler {
closeAsync(Result.FAILURE);
FutureUtils.orTimeout(
- terminationFuture, FATAL_ERROR_SHUTDOWN_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ terminationFuture,
+ FATAL_ERROR_SHUTDOWN_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS,
+ String.format(
+ "Waiting for TaskManager shutting down timed out
after %s ms.",
+ FATAL_ERROR_SHUTDOWN_TIMEOUT_MS));
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 67dcffa8cb2..a1da741ce04 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -259,7 +259,12 @@ public class OperatorEventSendingCheckpointITCase extends
TestLogger {
private static CompletableFuture<Acknowledge> askTimeoutFuture() {
final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
- FutureUtils.orTimeout(future, 500, TimeUnit.MILLISECONDS);
+ final long timeout = 500;
+ FutureUtils.orTimeout(
+ future,
+ timeout,
+ TimeUnit.MILLISECONDS,
+ String.format("Future timed out after %s ms.", timeout));
return future;
}