This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7bcf919d88 [ISSUE #9128] Fix NPE when grpc client ack message
immediately after changing proxy (#9129)
7bcf919d88 is described below
commit 7bcf919d88731689c34b5c808598d40c9e33822f
Author: qianye <[email protected]>
AuthorDate: Tue Jan 14 16:01:28 2025 +0800
[ISSUE #9128] Fix NPE when grpc client ack message immediately after
changing proxy (#9129)
---
.../rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 4a5b9cfcd6..76019a1ca9 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
+import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.processor.BatchAckResult;
@@ -193,10 +194,12 @@ public class AckMessageActivity extends
AbstractMessingActivity {
protected String getHandleString(ProxyContext ctx, String group,
AckMessageRequest request, AckMessageEntry ackMessageEntry) {
String handleString = ackMessageEntry.getReceiptHandle();
-
- MessageReceiptHandle messageReceiptHandle =
messagingProcessor.removeReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group,
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
- if (messageReceiptHandle != null) {
- handleString = messageReceiptHandle.getReceiptHandleStr();
+ GrpcClientChannel channel =
grpcChannelManager.getChannel(ctx.getClientID());
+ if (channel != null) {
+ MessageReceiptHandle messageReceiptHandle =
messagingProcessor.removeReceiptHandle(ctx, channel, group,
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+ if (messageReceiptHandle != null) {
+ handleString = messageReceiptHandle.getReceiptHandleStr();
+ }
}
return handleString;
}