This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3c7ed3333d2 KAFKA-18397: Added null check before sending background
event from ShareConsumeRequestManager. (#18419)
3c7ed3333d2 is described below
commit 3c7ed3333d24a0b75e29e4ee7214e13066f2050f
Author: ShivsundarR <[email protected]>
AuthorDate: Wed Jan 8 08:56:52 2025 -0500
KAFKA-18397: Added null check before sending background event from
ShareConsumeRequestManager. (#18419)
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareConsumeRequestManager.java | 6 +-
.../internals/ShareConsumeRequestManagerTest.java | 90 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index 6aa334d487b..a83e971600e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -1169,14 +1169,16 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* signal the completion when all results are known.
*/
public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, boolean isCommitAsync) {
- if (acknowledgements != null) {
+ if (!isCommitAsync && acknowledgements != null) {
result.put(partition, acknowledgements);
}
// For commitAsync, we do not wait for other results to complete,
we prepare a background event
// for every ShareAcknowledgeResponse.
// For commitAsync, we send out a background event for every
TopicIdPartition, so we use a singletonMap each time.
if (isCommitAsync) {
-
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition,
acknowledgements));
+ if (acknowledgements != null) {
+
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition,
acknowledgements));
+ }
} else if (remainingResults != null &&
remainingResults.decrementAndGet() == 0) {
maybeSendShareAcknowledgeCommitCallbackEvent(result);
future.ifPresent(future -> future.complete(result));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 1b1ed587203..640eadc0e77 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -96,8 +96,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -548,6 +550,89 @@ public class ShareConsumeRequestManagerTest {
completedAcknowledgements.clear();
}
+ @Test
+ public void testResultHandlerOnCommitAsync() {
+ buildRequestManager();
+ // Enabling the config so that background event is sent when the
acknowledgement response is received.
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
+
+ // Passing null acknowledgements should mean we do not send the
background event at all.
+ resultHandler.complete(tip0, null, true);
+ assertEquals(0, completedAcknowledgements.size());
+
+ // Setting isCommitAsync to false should still not send any background
event
+ // as we have initialized remainingResults to null.
+ resultHandler.complete(tip0, acknowledgements, false);
+ assertEquals(0, completedAcknowledgements.size());
+
+ // Sending non-null acknowledgements means we do send the background
event
+ resultHandler.complete(tip0, acknowledgements, true);
+ assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+ }
+
+ @Test
+ public void testResultHandlerOnCommitSync() {
+ buildRequestManager();
+ // Enabling the config so that background event is sent when the
acknowledgement response is received.
+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
+
+ Acknowledgements acknowledgements = Acknowledgements.empty();
+ acknowledgements.add(1L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(2L, AcknowledgeType.ACCEPT);
+ acknowledgements.add(3L, AcknowledgeType.REJECT);
+
+ final CompletableFuture<Map<TopicIdPartition, Acknowledgements>>
future = new CompletableFuture<>();
+
+ // Initializing resultCount to 3.
+ AtomicInteger resultCount = new AtomicInteger(3);
+
+ ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
+
+ // We only send the background event after all results have been
completed.
+ resultHandler.complete(tip0, acknowledgements, false);
+ assertEquals(0, completedAcknowledgements.size());
+ assertFalse(future.isDone());
+
+ resultHandler.complete(t2ip0, null, false);
+ assertEquals(0, completedAcknowledgements.size());
+ assertFalse(future.isDone());
+
+ // After third response is received, we send the background event.
+ resultHandler.complete(tip1, acknowledgements, false);
+ assertEquals(1, completedAcknowledgements.size());
+ assertEquals(2, completedAcknowledgements.get(0).size());
+ assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
+ assertEquals(3, completedAcknowledgements.get(0).get(tip1).size());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void testResultHandlerCompleteIfEmpty() {
+ buildRequestManager();
+
+ final CompletableFuture<Map<TopicIdPartition, Acknowledgements>>
future = new CompletableFuture<>();
+
+ // Initializing resultCount to 1.
+ AtomicInteger resultCount = new AtomicInteger(1);
+
+ ShareConsumeRequestManager.ResultHandler resultHandler =
shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
+
+ resultHandler.completeIfEmpty();
+ assertFalse(future.isDone());
+
+ resultCount.decrementAndGet();
+
+ resultHandler.completeIfEmpty();
+ assertTrue(future.isDone());
+ }
+
@Test
public void testBatchingAcknowledgeRequestStates() {
buildRequestManager();
@@ -1730,6 +1815,11 @@ public class ShareConsumeRequestManagerTest {
return pollResult.unsentRequests.size();
}
+ public ResultHandler buildResultHandler(final AtomicInteger
remainingResults,
+ final
Optional<CompletableFuture<Map<TopicIdPartition, Acknowledgements>>> future) {
+ return new ResultHandler(remainingResults, future);
+ }
+
public Tuple<AcknowledgeRequestState> requestStates(int nodeId) {
return super.requestStates(nodeId);
}