This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 679854c9b69 KAFKA-19896: Use MockTime::sleep in 
ShareConsumeRequestManagerTest. (#20922)
679854c9b69 is described below

commit 679854c9b6957c54952ab1cef0279e23c8e79125
Author: Shivsundar R <[email protected]>
AuthorDate: Wed Nov 19 06:36:51 2025 -0500

    KAFKA-19896: Use MockTime::sleep in ShareConsumeRequestManagerTest. (#20922)
    
    *What*
    
    - Currently, when there is a backoff wait period, we are retrying the
    acknowledgements until the backoff period completes and then these acks
    are sent.
    
    - But as this is a unit test, we can use time.sleep() to forward the
    `currentTime`, which will allow the backoff period to be over.
    
    - PR fixes the 4 tests in `ShareConsumeRequestManagerTest` to use
    `MockTime::sleep`. This makes the tests faster as we do not actually
    need to wait for the backoff. We can just update the value.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManagerTest.java  | 31 +++++++++++++---------
 1 file changed, 19 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 9f20a3b014d..dc05257bd6e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -77,7 +77,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -366,14 +365,15 @@ public class ShareConsumeRequestManagerTest {
 
         assertEquals(1, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
 
-        TestUtils.retryOnExceptionWithTimeout(() -> {
-            assertEquals(0, shareConsumeRequestManager.sendAcknowledgements());
-            // We expect the remaining acknowledgements to be cleared due to 
share session epoch being set to 0.
-            assertNull(shareConsumeRequestManager.requestStates(0));
-            // The callback for these unsent acknowledgements will be invoked 
with an error code.
-            assertEquals(Map.of(tip0, acknowledgements2), 
completedAcknowledgements.get(0));
-            assertInstanceOf(ShareSessionNotFoundException.class, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
-        });
+        // Wait for backoff time before sending the next request.
+        time.sleep(retryBackoffMs);
+
+        assertEquals(0, shareConsumeRequestManager.sendAcknowledgements());
+        // We expect the remaining acknowledgements to be cleared due to share 
session epoch being set to 0.
+        assertNull(shareConsumeRequestManager.requestStates(0));
+        // The callback for these unsent acknowledgements will be invoked with 
an error code.
+        assertEquals(Map.of(tip0, acknowledgements2), 
completedAcknowledgements.get(0));
+        assertInstanceOf(ShareSessionNotFoundException.class, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
 
         // Attempt a normal fetch to check if nodesWithPendingRequests is 
empty.
         assertEquals(1, sendFetches());
@@ -653,7 +653,10 @@ public class ShareConsumeRequestManagerTest {
         assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
         assertEquals(0, 
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
 
-        TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, 
shareConsumeRequestManager.sendAcknowledgements()));
+        // Wait for backoff time before sending the next request.
+        // After the first attempt, it can maximum be 1.2x of the configured 
backoff when acknowledge fails. (jitter = 0.2)
+        time.sleep((long) (1.5 * retryBackoffMs));
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
 
         assertEquals(6, 
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
 
@@ -1232,7 +1235,9 @@ public class ShareConsumeRequestManagerTest {
         shareConsumeRequestManager.commitAsync(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)),
                 calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
 
-        TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, 
shareConsumeRequestManager.sendAcknowledgements()));
+        // Wait for backoff time before sending the next request.
+        time.sleep(retryBackoffMs);
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
 
         client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
         networkClientDelegate.poll(time.timer(0));
@@ -1401,7 +1406,9 @@ public class ShareConsumeRequestManagerTest {
 
         assertEquals(1, 
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
 
-        TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1, 
shareConsumeRequestManager.sendAcknowledgements()));
+        // Wait for backoff time before sending the next request. (it can 
maximum be 1.2x of the configured backoff when acknowledge fails.)
+        time.sleep((long) (1.5 * retryBackoffMs));
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
 
         client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
         networkClientDelegate.poll(time.timer(0));

Reply via email to