This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 679854c9b69 KAFKA-19896: Use MockTime::sleep in
ShareConsumeRequestManagerTest. (#20922)
679854c9b69 is described below
commit 679854c9b6957c54952ab1cef0279e23c8e79125
Author: Shivsundar R <[email protected]>
AuthorDate: Wed Nov 19 06:36:51 2025 -0500
KAFKA-19896: Use MockTime::sleep in ShareConsumeRequestManagerTest. (#20922)
*What*
- Currently, when there is a backoff wait period, we are retrying the
acknowledgements until the backoff period completes and then these acks
are sent.
- But as this is a unit test, we can use time.sleep() to forward the
`currentTime`, which will allow the backoff period to be over.
- PR fixes the 4 tests in `ShareConsumeRequestManagerTest` to use
`MockTime::sleep`. This makes the tests faster as we do not actually
need to wait for the backoff. We can just update the value.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManagerTest.java | 31 +++++++++++++---------
1 file changed, 19 insertions(+), 12 deletions(-)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 9f20a3b014d..dc05257bd6e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -77,7 +77,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -366,14 +365,15 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1,
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getAcknowledgementsToSendCount(tip0));
- TestUtils.retryOnExceptionWithTimeout(() -> {
- assertEquals(0, shareConsumeRequestManager.sendAcknowledgements());
- // We expect the remaining acknowledgements to be cleared due to
share session epoch being set to 0.
- assertNull(shareConsumeRequestManager.requestStates(0));
- // The callback for these unsent acknowledgements will be invoked
with an error code.
- assertEquals(Map.of(tip0, acknowledgements2),
completedAcknowledgements.get(0));
- assertInstanceOf(ShareSessionNotFoundException.class,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
- });
+ // Wait for backoff time before sending the next request.
+ time.sleep(retryBackoffMs);
+
+ assertEquals(0, shareConsumeRequestManager.sendAcknowledgements());
+ // We expect the remaining acknowledgements to be cleared due to share
session epoch being set to 0.
+ assertNull(shareConsumeRequestManager.requestStates(0));
+ // The callback for these unsent acknowledgements will be invoked with
an error code.
+ assertEquals(Map.of(tip0, acknowledgements2),
completedAcknowledgements.get(0));
+ assertInstanceOf(ShareSessionNotFoundException.class,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
// Attempt a normal fetch to check if nodesWithPendingRequests is
empty.
assertEquals(1, sendFetches());
@@ -653,7 +653,10 @@ public class ShareConsumeRequestManagerTest {
assertEquals(6,
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getIncompleteAcknowledgementsCount(tip0));
assertEquals(0,
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
- TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1,
shareConsumeRequestManager.sendAcknowledgements()));
+ // Wait for backoff time before sending the next request.
+ // After the first attempt, it can maximum be 1.2x of the configured
backoff when acknowledge fails. (jitter = 0.2)
+ time.sleep((long) (1.5 * retryBackoffMs));
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
assertEquals(6,
shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek().getInFlightAcknowledgementsCount(tip0));
@@ -1232,7 +1235,9 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.commitAsync(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
- TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1,
shareConsumeRequestManager.sendAcknowledgements()));
+ // Wait for backoff time before sending the next request.
+ time.sleep(retryBackoffMs);
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(tip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));
@@ -1401,7 +1406,9 @@ public class ShareConsumeRequestManagerTest {
assertEquals(1,
shareConsumeRequestManager.requestStates(0).getAsyncRequest().getIncompleteAcknowledgementsCount(tip0));
- TestUtils.retryOnExceptionWithTimeout(() -> assertEquals(1,
shareConsumeRequestManager.sendAcknowledgements()));
+ // Wait for backoff time before sending the next request. (it can
maximum be 1.2x of the configured backoff when acknowledge fails.)
+ time.sleep((long) (1.5 * retryBackoffMs));
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
client.prepareResponse(fullAcknowledgeResponse(t2ip0, Errors.NONE));
networkClientDelegate.poll(time.timer(0));