This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8fde6dedea8 KAFKA-18155 : Fix bug in response handler for
ShareAcknowledge (#18029)
8fde6dedea8 is described below
commit 8fde6dedea839709d8c83a4a0c32403c917ceab8
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Dec 5 02:29:13 2024 -0500
KAFKA-18155 : Fix bug in response handler for ShareAcknowledge (#18029)
In the response handler for ShareAcknowledge, we are passing the
clientResponse.receivedTimeMs() to the handler methods. But when there is a
disconnect or when the response received is null, we should be passing the
current time instead.
This bug was causing consumer to hang as it did not call the handler
methods on disconnect, and further requests were blocked waiting for its
completion.
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>, Manikumar Reddy <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 19 ++++++------
.../internals/ShareConsumeRequestManagerTest.java | 34 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 9 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 5aa0310fe20..a5eb91128d3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -996,14 +996,6 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
nodesWithPendingRequests.add(nodeId);
isProcessed = false;
- BiConsumer<ClientResponse, Throwable> responseHandler =
(clientResponse, error) -> {
- if (error != null) {
- handleShareAcknowledgeFailure(nodeToSend,
requestBuilder.data(), this, error, clientResponse.receivedTimeMs());
- } else {
- handleShareAcknowledgeSuccess(nodeToSend,
requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs());
- }
- };
-
if (requestBuilder == null) {
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
@@ -1014,7 +1006,16 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
} else {
incompleteAcknowledgements.clear();
}
- return new UnsentRequest(requestBuilder,
Optional.of(nodeToSend)).whenComplete(responseHandler);
+
+ UnsentRequest unsentRequest = new
UnsentRequest(requestBuilder, Optional.of(nodeToSend));
+ BiConsumer<ClientResponse, Throwable> responseHandler =
(clientResponse, error) -> {
+ if (error != null) {
+ handleShareAcknowledgeFailure(nodeToSend,
requestBuilder.data(), this, error, unsentRequest.handler().completionTimeMs());
+ } else {
+ handleShareAcknowledgeSuccess(nodeToSend,
requestBuilder.data(), this, clientResponse,
unsentRequest.handler().completionTimeMs());
+ }
+ };
+ return unsentRequest.whenComplete(responseHandler);
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 6897f796dea..59473ae9f8b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -368,6 +368,40 @@ public class ShareConsumeRequestManagerTest {
completedAcknowledgements.clear();
}
+ @Test
+ public void testServerDisconnectedOnShareAcknowledge() {
+ buildRequestManager();
+ // Enabling the config so that background event is sent when the
acknowledgement response is received.
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ assignFromSubscribed(Collections.singleton(tp0));
+
+ // normal fetch
+ assertEquals(1, sendFetches());
+ assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE));
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0,
acknowledgements));
+
+ assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+ client.prepareResponse(null, true);
+ networkClientDelegate.poll(time.timer(0));
+ assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+ assertEquals(Collections.singletonMap(tip0, acknowledgements),
completedAcknowledgements.get(0));
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR,
completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode());
+ completedAcknowledgements.clear();
+ }
+
@Test
public void testAcknowledgeOnClose() {
buildRequestManager();