This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c7ed3333d2 KAFKA-18397: Added null check before sending background 
event from ShareConsumeRequestManager. (#18419)
3c7ed3333d2 is described below

commit 3c7ed3333d24a0b75e29e4ee7214e13066f2050f
Author: ShivsundarR <[email protected]>
AuthorDate: Wed Jan 8 08:56:52 2025 -0500

    KAFKA-18397: Added null check before sending background event from 
ShareConsumeRequestManager. (#18419)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareConsumeRequestManager.java      |  6 +-
 .../internals/ShareConsumeRequestManagerTest.java  | 90 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 6aa334d487b..a83e971600e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -1169,14 +1169,16 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * signal the completion when all results are known.
          */
         public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, boolean isCommitAsync) {
-            if (acknowledgements != null) {
+            if (!isCommitAsync && acknowledgements != null) {
                 result.put(partition, acknowledgements);
             }
             // For commitAsync, we do not wait for other results to complete, 
we prepare a background event
             // for every ShareAcknowledgeResponse.
             // For commitAsync, we send out a background event for every 
TopicIdPartition, so we use a singletonMap each time.
             if (isCommitAsync) {
-                
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition,
 acknowledgements));
+                if (acknowledgements != null) {
+                    
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition,
 acknowledgements));
+                }
             } else if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
                 maybeSendShareAcknowledgeCommitCallbackEvent(result);
                 future.ifPresent(future -> future.complete(result));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 1b1ed587203..640eadc0e77 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -96,8 +96,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -548,6 +550,89 @@ public class ShareConsumeRequestManagerTest {
         completedAcknowledgements.clear();
     }
 
+    @Test
+    public void testResultHandlerOnCommitAsync() {
+        buildRequestManager();
+        // Enabling the config so that background event is sent when the 
acknowledgement response is received.
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
+
+        // Passing null acknowledgements should mean we do not send the 
background event at all.
+        resultHandler.complete(tip0, null, true);
+        assertEquals(0, completedAcknowledgements.size());
+
+        // Setting isCommitAsync to false should still not send any background 
event
+        // as we have initialized remainingResults to null.
+        resultHandler.complete(tip0, acknowledgements, false);
+        assertEquals(0, completedAcknowledgements.size());
+
+        // Sending non-null acknowledgements means we do send the background 
event
+        resultHandler.complete(tip0, acknowledgements, true);
+        assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+    }
+
+    @Test
+    public void testResultHandlerOnCommitSync() {
+        buildRequestManager();
+        // Enabling the config so that background event is sent when the 
acknowledgement response is received.
+        
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+        Acknowledgements acknowledgements = Acknowledgements.empty();
+        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+        acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+        final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> 
future = new CompletableFuture<>();
+
+        // Initializing resultCount to 3.
+        AtomicInteger resultCount = new AtomicInteger(3);
+
+        ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
+
+        // We only send the background event after all results have been 
completed.
+        resultHandler.complete(tip0, acknowledgements, false);
+        assertEquals(0, completedAcknowledgements.size());
+        assertFalse(future.isDone());
+
+        resultHandler.complete(t2ip0, null, false);
+        assertEquals(0, completedAcknowledgements.size());
+        assertFalse(future.isDone());
+
+        // After third response is received, we send the background event.
+        resultHandler.complete(tip1, acknowledgements, false);
+        assertEquals(1, completedAcknowledgements.size());
+        assertEquals(2, completedAcknowledgements.get(0).size());
+        assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+        assertEquals(3, completedAcknowledgements.get(0).get(tip1).size());
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void testResultHandlerCompleteIfEmpty() {
+        buildRequestManager();
+
+        final CompletableFuture<Map<TopicIdPartition, Acknowledgements>> 
future = new CompletableFuture<>();
+
+        // Initializing resultCount to 1.
+        AtomicInteger resultCount = new AtomicInteger(1);
+
+        ShareConsumeRequestManager.ResultHandler resultHandler = 
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
+
+        resultHandler.completeIfEmpty();
+        assertFalse(future.isDone());
+
+        resultCount.decrementAndGet();
+
+        resultHandler.completeIfEmpty();
+        assertTrue(future.isDone());
+    }
+
     @Test
     public void testBatchingAcknowledgeRequestStates() {
         buildRequestManager();
@@ -1730,6 +1815,11 @@ public class ShareConsumeRequestManagerTest {
             return pollResult.unsentRequests.size();
         }
 
+        public ResultHandler buildResultHandler(final AtomicInteger 
remainingResults,
+                                                final 
Optional<CompletableFuture<Map<TopicIdPartition, Acknowledgements>>> future) {
+            return new ResultHandler(remainingResults, future);
+        }
+
         public Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
             return super.requestStates(nodeId);
         }

Reply via email to