This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8fde6dedea8 KAFKA-18155 : Fix bug in response handler for 
ShareAcknowledge (#18029)
8fde6dedea8 is described below

commit 8fde6dedea839709d8c83a4a0c32403c917ceab8
Author: ShivsundarR <[email protected]>
AuthorDate: Thu Dec 5 02:29:13 2024 -0500

    KAFKA-18155 : Fix bug in response handler for ShareAcknowledge (#18029)
    
    In the response handler for ShareAcknowledge, we are passing the 
clientResponse.receivedTimeMs() to the handler methods. But when there is a 
disconnect or when the response received is null, we should be passing the 
current time instead.
    
    This bug was causing consumer to hang as it did not call the handler 
methods on disconnect, and further requests were blocked waiting for its 
completion.
    
    Reviewers: Andrew Schofield <[email protected]>,  Apoorv Mittal 
<[email protected]>, Manikumar Reddy <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      | 19 ++++++------
 .../internals/ShareConsumeRequestManagerTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 9 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 5aa0310fe20..a5eb91128d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -996,14 +996,6 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
             nodesWithPendingRequests.add(nodeId);
             isProcessed = false;
 
-            BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
-                if (error != null) {
-                    handleShareAcknowledgeFailure(nodeToSend, 
requestBuilder.data(), this, error, clientResponse.receivedTimeMs());
-                } else {
-                    handleShareAcknowledgeSuccess(nodeToSend, 
requestBuilder.data(), this, clientResponse, clientResponse.receivedTimeMs());
-                }
-            };
-
             if (requestBuilder == null) {
                 handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
                 return null;
@@ -1014,7 +1006,16 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 } else {
                     incompleteAcknowledgements.clear();
                 }
-                return new UnsentRequest(requestBuilder, 
Optional.of(nodeToSend)).whenComplete(responseHandler);
+
+                UnsentRequest unsentRequest = new 
UnsentRequest(requestBuilder, Optional.of(nodeToSend));
+                BiConsumer<ClientResponse, Throwable> responseHandler = 
(clientResponse, error) -> {
+                    if (error != null) {
+                        handleShareAcknowledgeFailure(nodeToSend, 
requestBuilder.data(), this, error, unsentRequest.handler().completionTimeMs());
+                    } else {
+                        handleShareAcknowledgeSuccess(nodeToSend, 
requestBuilder.data(), this, clientResponse, 
unsentRequest.handler().completionTimeMs());
+                    }
+                };
+                return unsentRequest.whenComplete(responseHandler);
             }
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 6897f796dea..59473ae9f8b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -368,6 +368,40 @@ public class ShareConsumeRequestManagerTest {
         completedAcknowledgements.clear();
     }
 
+    @Test
+    public void testServerDisconnectedOnShareAcknowledge() {
+        buildRequestManager();
+        // Enabling the config so that background event is sent when the 
acknowledgement response is received.
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        assignFromSubscribed(Collections.singleton(tp0));
+
+        // normal fetch
+        assertEquals(1, sendFetches());
+        assertFalse(shareConsumeRequestManager.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE));
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        shareConsumeRequestManager.commitAsync(Collections.singletonMap(tip0, 
acknowledgements));
+
+        assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
+
+        client.prepareResponse(null, true);
+        networkClientDelegate.poll(time.timer(0));
+        assertTrue(shareConsumeRequestManager.hasCompletedFetches());
+
+        assertEquals(Collections.singletonMap(tip0, acknowledgements), 
completedAcknowledgements.get(0));
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
completedAcknowledgements.get(0).get(tip0).getAcknowledgeErrorCode());
+        completedAcknowledgements.clear();
+    }
+
     @Test
     public void testAcknowledgeOnClose() {
         buildRequestManager();

Reply via email to