This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new baa870a5829 KAFKA-18214 TestUtils#waitForCondition does not honor the
maxWaitMs (#18145)
baa870a5829 is described below
commit baa870a5829b0b3184cadb2c68ea3276a7f1dd3c
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Fri Dec 13 07:44:57 2024 +0800
KAFKA-18214 TestUtils#waitForCondition does not honor the maxWaitMs (#18145)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/common/test/KafkaClusterTestKit.java | 7 ++---
.../org/apache/kafka/common/test/TestUtils.java | 33 ++++++----------------
2 files changed, 12 insertions(+), 28 deletions(-)
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 24df147b21e..2face50ca2f 100644
---
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -539,15 +539,14 @@ public class KafkaClusterTestKit implements AutoCloseable
{
public Controller waitForActiveController() throws InterruptedException {
AtomicReference<Controller> active = new AtomicReference<>(null);
- TestUtils.retryOnExceptionWithTimeout(() -> {
+ TestUtils.waitForCondition(() -> {
for (ControllerServer controllerServer : controllers.values()) {
if (controllerServer.controller().isActive()) {
active.set(controllerServer.controller());
}
}
- if (active.get() == null)
- throw new RuntimeException("Controller not active");
- });
+ return active.get() != null;
+ }, 60_000, "Controller not active");
return active.get();
}
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
b/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
index f55b1be5c05..d5f98be24b7 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -53,7 +53,6 @@ public class TestUtils {
private static final long DEFAULT_POLL_INTERVAL_MS = 100;
private static final long DEFAULT_MAX_WAIT_MS = 15_000;
- private static final long DEFAULT_TIMEOUT_MS = 60_000;
/**
* Create an empty file in the default temporary-file directory, using
`kafka` as the prefix and `tmp` as the
@@ -117,40 +116,26 @@ public class TestUtils {
*/
public static void waitForCondition(final Supplier<Boolean> testCondition,
final long maxWaitMs,
- String conditionDetails
- ) throws InterruptedException {
- retryOnExceptionWithTimeout(() -> {
- String conditionDetail = conditionDetails == null ? "" :
conditionDetails;
- if (!testCondition.get())
- throw new TimeoutException("Condition not met within timeout "
+ maxWaitMs + ". " + conditionDetail);
- });
- }
-
- /**
- * Wait for the given runnable to complete successfully, i.e. throw now
{@link Exception}s or
- * {@link AssertionError}s, or for the given timeout to expire. If the
timeout expires then the
- * last exception or assertion failure will be thrown thus providing
context for the failure.
- *
- * @param runnable the code to attempt to execute successfully.
- * @throws InterruptedException if the current thread is interrupted while
waiting for {@code runnable} to complete successfully.
- */
- static void retryOnExceptionWithTimeout(final Runnable runnable) throws
InterruptedException {
- final long expectedEnd = System.currentTimeMillis() +
DEFAULT_TIMEOUT_MS;
+ String conditionDetails) throws
InterruptedException {
+ final long expectedEnd = System.currentTimeMillis() + maxWaitMs;
while (true) {
try {
- runnable.run();
- return;
+ if (testCondition.get()) {
+ return;
+ }
+ String conditionDetail = conditionDetails == null ? "" :
conditionDetails;
+ throw new TimeoutException("Condition not met: " +
conditionDetail);
} catch (final AssertionError t) {
if (expectedEnd <= System.currentTimeMillis()) {
throw t;
}
} catch (final Exception e) {
if (expectedEnd <= System.currentTimeMillis()) {
- throw new AssertionError(format("Assertion failed with an
exception after %s ms", DEFAULT_TIMEOUT_MS), e);
+ throw new AssertionError(format("Assertion failed with an
exception after %s ms", maxWaitMs), e);
}
}
- Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
+ Thread.sleep(Math.min(DEFAULT_POLL_INTERVAL_MS, maxWaitMs));
}
}