This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a31b9eae81 Add null check (#18119)
7a31b9eae81 is described below
commit 7a31b9eae8192dc14779d3e64587988b019cfb6e
Author: ShivsundarR <[email protected]>
AuthorDate: Tue Dec 10 12:38:25 2024 -0500
Add null check (#18119)
Reviewers: Andrew Schofield <[email protected]>
---
.../clients/consumer/internals/ShareConsumeRequestManager.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index eda01ee3599..6aa334d487b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -992,12 +992,12 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
ShareAcknowledgeRequest.Builder requestBuilder =
sessionHandler.newShareAcknowledgeBuilder(groupId, fetchConfig);
isProcessed = false;
+ Node nodeToSend = metadata.fetch().nodeById(nodeId);
if (requestBuilder == null) {
handleSessionErrorCode(Errors.SHARE_SESSION_NOT_FOUND);
return null;
- } else {
- Node nodeToSend = metadata.fetch().nodeById(nodeId);
+ } else if (nodeToSend != null) {
nodesWithPendingRequests.add(nodeId);
log.trace("Building acknowledgements to send : {}",
finalAcknowledgementsToSend);
@@ -1019,6 +1019,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
};
return unsentRequest.whenComplete(responseHandler);
}
+
+ return null;
}
int getInFlightAcknowledgementsCount(TopicIdPartition tip) {