This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new baa870a5829 KAFKA-18214 TestUtils#waitForCondition does not honor the 
maxWaitMs (#18145)
baa870a5829 is described below

commit baa870a5829b0b3184cadb2c68ea3276a7f1dd3c
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Fri Dec 13 07:44:57 2024 +0800

    KAFKA-18214 TestUtils#waitForCondition does not honor the maxWaitMs (#18145)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/common/test/KafkaClusterTestKit.java     |  7 ++---
 .../org/apache/kafka/common/test/TestUtils.java    | 33 ++++++----------------
 2 files changed, 12 insertions(+), 28 deletions(-)

diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 24df147b21e..2face50ca2f 100644
--- 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -539,15 +539,14 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
 
     public Controller waitForActiveController() throws InterruptedException {
         AtomicReference<Controller> active = new AtomicReference<>(null);
-        TestUtils.retryOnExceptionWithTimeout(() -> {
+        TestUtils.waitForCondition(() -> {
             for (ControllerServer controllerServer : controllers.values()) {
                 if (controllerServer.controller().isActive()) {
                     active.set(controllerServer.controller());
                 }
             }
-            if (active.get() == null)
-                throw new RuntimeException("Controller not active");
-        });
+            return active.get() != null;
+        }, 60_000, "Controller not active");
         return active.get();
     }
 
diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java 
b/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
index f55b1be5c05..d5f98be24b7 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -53,7 +53,6 @@ public class TestUtils {
 
     private static final long DEFAULT_POLL_INTERVAL_MS = 100;
     private static final long DEFAULT_MAX_WAIT_MS = 15_000;
-    private static final long DEFAULT_TIMEOUT_MS = 60_000;
 
     /**
      * Create an empty file in the default temporary-file directory, using 
`kafka` as the prefix and `tmp` as the
@@ -117,40 +116,26 @@ public class TestUtils {
      */
     public static void waitForCondition(final Supplier<Boolean> testCondition, 
                                         final long maxWaitMs, 
-                                        String conditionDetails
-    ) throws InterruptedException {
-        retryOnExceptionWithTimeout(() -> {
-            String conditionDetail = conditionDetails == null ? "" : 
conditionDetails;
-            if (!testCondition.get())
-                throw new TimeoutException("Condition not met within timeout " 
+ maxWaitMs + ". " + conditionDetail);
-        });
-    }
-
-    /**
-     * Wait for the given runnable to complete successfully, i.e. throw now 
{@link Exception}s or
-     * {@link AssertionError}s, or for the given timeout to expire. If the 
timeout expires then the
-     * last exception or assertion failure will be thrown thus providing 
context for the failure.
-     *
-     * @param runnable the code to attempt to execute successfully.
-     * @throws InterruptedException if the current thread is interrupted while 
waiting for {@code runnable} to complete successfully.
-     */
-    static void retryOnExceptionWithTimeout(final Runnable runnable) throws 
InterruptedException {
-        final long expectedEnd = System.currentTimeMillis() + 
DEFAULT_TIMEOUT_MS;
+                                        String conditionDetails) throws 
InterruptedException {
+        final long expectedEnd = System.currentTimeMillis() + maxWaitMs;
 
         while (true) {
             try {
-                runnable.run();
-                return;
+                if (testCondition.get()) {
+                    return;
+                }
+                String conditionDetail = conditionDetails == null ? "" : 
conditionDetails;
+                throw new TimeoutException("Condition not met: " + 
conditionDetail);
             } catch (final AssertionError t) {
                 if (expectedEnd <= System.currentTimeMillis()) {
                     throw t;
                 }
             } catch (final Exception e) {
                 if (expectedEnd <= System.currentTimeMillis()) {
-                    throw new AssertionError(format("Assertion failed with an 
exception after %s ms", DEFAULT_TIMEOUT_MS), e);
+                    throw new AssertionError(format("Assertion failed with an 
exception after %s ms", maxWaitMs), e);
                 }
             }
-            Thread.sleep(DEFAULT_POLL_INTERVAL_MS);
+            Thread.sleep(Math.min(DEFAULT_POLL_INTERVAL_MS, maxWaitMs));
         }
     }
 

Reply via email to