This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new be1b7d2a2 [ISSUE #5313] Optimize ReceiptHandleProcessor and 
GrpcChannelManager (#5275)
be1b7d2a2 is described below

commit be1b7d2a261e5a4fc876dcf5d3fcb699106bbfd6
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Oct 14 16:53:41 2022 +0800

    [ISSUE #5313] Optimize ReceiptHandleProcessor and GrpcChannelManager (#5275)
    
    * [ISSUE #5313] Optimize proxy module methods and interfaces
    
    Use channel to manage MessageReceiptHandle
    
    Remove group in GrpcChannelManager
    
    * [ISSUE #5313] Add get method for ReceiptHandleGroup
---
 .../rocketmq/broker/client/ConsumerManager.java    |   8 +
 .../processor/ReplyMessageProcessorTest.java       |   2 +-
 .../rocketmq/proxy/common/ContextVariable.java     |   1 +
 .../apache/rocketmq/proxy/common/ProxyContext.java |  10 +
 .../rocketmq/proxy/common/ReceiptHandleGroup.java  |  24 ++
 .../proxy/grpc/v2/channel/GrpcChannelManager.java  |  34 +--
 .../proxy/grpc/v2/channel/GrpcClientChannel.java   |  20 +-
 .../proxy/grpc/v2/client/ClientActivity.java       |  13 +-
 .../proxy/grpc/v2/consumer/AckMessageActivity.java |   2 +-
 .../consumer/ChangeInvisibleDurationActivity.java  |   2 +-
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |   2 +-
 .../v2/producer/ForwardMessageToDLQActivity.java   |   2 +-
 .../rocketmq/proxy/processor/ClientProcessor.java  |   4 +-
 .../proxy/processor/DefaultMessagingProcessor.java |   4 +-
 .../proxy/processor/MessagingProcessor.java        |   2 +-
 .../proxy/processor/ReceiptHandleProcessor.java    |  35 ++-
 .../proxy/common/ReceiptHandleGroupTest.java       |  79 ++++++
 .../proxy/grpc/v2/client/ClientActivityTest.java   |   1 -
 .../ChangeInvisibleDurationActivityTest.java       |   2 +-
 .../producer/ForwardMessageToDLQActivityTest.java  |   2 +-
 .../processor/ReceiptHandleProcessorTest.java      | 312 ++++++++++++++++++---
 .../mqclient/ProxyClientRemotingProcessorTest.java |   2 +-
 22 files changed, 451 insertions(+), 112 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index ee11329e4..50c3729ba 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -65,6 +65,14 @@ public class ConsumerManager {
         return null;
     }
 
+    public ClientChannelInfo findChannel(final String group, final Channel 
channel) {
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.findChannel(channel);
+        }
+        return null;
+    }
+
     public SubscriptionData findSubscriptionData(final String group, final 
String topic) {
         ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
         if (consumerGroupInfo != null) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
index e604b6336..899105873 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -96,7 +96,7 @@ public class ReplyMessageProcessorTest {
         
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
         brokerController.getProducerManager().registerProducer(group, 
clientInfo);
         final RemotingCommand request = 
createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE);
-        
when(brokerController.getBroker2Client().callClient(any(Channel.class), 
any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, 
request));
+        when(brokerController.getBroker2Client().callClient(any(), 
any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, 
request));
         RemotingCommand responseToReturn = 
replyMessageProcessor.processRequest(handlerContext, request);
         assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
         
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
index dcfc52909..00b3e76c7 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ContextVariable.java
@@ -21,6 +21,7 @@ public class ContextVariable {
     public static final String REMOTE_ADDRESS = "remote-address";
     public static final String LOCAL_ADDRESS = "local-address";
     public static final String CLIENT_ID = "client-id";
+    public static final String CHANNEL = "channel";
     public static final String LANGUAGE = "language";
     public static final String CLIENT_VERSION = "client-version";
     public static final String REMAINING_MS = "remaining-ms";
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
index 6a35993fe..8fb9f4d53 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ProxyContext.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.proxy.common;
 
+import io.netty.channel.Channel;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -76,6 +77,15 @@ public class ProxyContext {
         return this.getVal(ContextVariable.CLIENT_ID);
     }
 
+    public ProxyContext setChannel(Channel channel) {
+        this.withVal(ContextVariable.CHANNEL, channel);
+        return this;
+    }
+
+    public Channel getChannel() {
+        return this.getVal(ContextVariable.CHANNEL);
+    }
+
     public ProxyContext setLanguage(String language) {
         this.withVal(ContextVariable.LANGUAGE, language);
         return this;
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index 07d32445f..05867c334 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -100,6 +100,30 @@ public class ReceiptHandleGroup {
         return this.receiptHandleMap.isEmpty();
     }
 
+    public MessageReceiptHandle get(String msgID, String handle) {
+        Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
+        if (handleMap == null) {
+            return null;
+        }
+        long timeout = 
ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
+        AtomicReference<MessageReceiptHandle> res = new AtomicReference<>();
+        handleMap.computeIfPresent(handle, (handleKey, handleData) -> {
+            if (!handleData.lock(timeout)) {
+                throw new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, "try to get handle 
failed");
+            }
+            try {
+                if (handleData.needRemove) {
+                    return null;
+                }
+                res.set(handleData.messageReceiptHandle);
+            } finally {
+                handleData.unlock();
+            }
+            return handleData;
+        });
+        return res.get();
+    }
+
     public MessageReceiptHandle remove(String msgID, String handle) {
         Map<String, HandleData> handleMap = this.receiptHandleMap.get(msgID);
         if (handleMap == null) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
index 57a7b1104..c25727743 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcChannelManager.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.proxy.grpc.v2.channel;
 
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,7 +25,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.proxy.common.ProxyContext;
@@ -38,7 +36,7 @@ import 
org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 
 public class GrpcChannelManager implements StartAndShutdown {
     private final ProxyRelayService proxyRelayService;
-    protected final ConcurrentMap<String /* group */, Map<String, 
GrpcClientChannel>/* clientId */> groupClientIdChannelMap = new 
ConcurrentHashMap<>();
+    protected final ConcurrentMap<String, GrpcClientChannel> 
clientIdChannelMap = new ConcurrentHashMap<>();
 
     protected final AtomicLong nonceIdGenerator = new AtomicLong(0);
     protected final ConcurrentMap<String /* nonce */, ResultFuture> 
resultNonceFutureMap = new ConcurrentHashMap<>();
@@ -58,35 +56,17 @@ public class GrpcChannelManager implements StartAndShutdown 
{
         );
     }
 
-    public GrpcClientChannel createChannel(ProxyContext ctx, String group, 
String clientId) {
-        this.groupClientIdChannelMap.compute(group, (groupKey, clientIdMap) -> 
{
-            if (clientIdMap == null) {
-                clientIdMap = new ConcurrentHashMap<>();
-            }
-            clientIdMap.computeIfAbsent(clientId, clientIdKey -> new 
GrpcClientChannel(proxyRelayService, this, ctx, group, clientId));
-            return clientIdMap;
-        });
-        return getChannel(group, clientId);
+    public GrpcClientChannel createChannel(ProxyContext ctx, String clientId) {
+        return this.clientIdChannelMap.computeIfAbsent(clientId,
+            k -> new GrpcClientChannel(proxyRelayService, this, ctx, 
clientId));
     }
 
-    public GrpcClientChannel getChannel(String group, String clientId) {
-        Map<String, GrpcClientChannel> clientIdChannelMap = 
this.groupClientIdChannelMap.get(group);
-        if (clientIdChannelMap == null) {
-            return null;
-        }
+    public GrpcClientChannel getChannel(String clientId) {
         return clientIdChannelMap.get(clientId);
     }
 
-    public GrpcClientChannel removeChannel(String group, String clientId) {
-        AtomicReference<GrpcClientChannel> channelRef = new 
AtomicReference<>();
-        this.groupClientIdChannelMap.computeIfPresent(group, (groupKey, 
clientIdMap) -> {
-            channelRef.set(clientIdMap.remove(clientId));
-            if (clientIdMap.isEmpty()) {
-                return null;
-            }
-            return clientIdMap;
-        });
-        return channelRef.get();
+    public GrpcClientChannel removeChannel(String clientId) {
+        return this.clientIdChannelMap.remove(clientId);
     }
 
     public <T> String addResponseFuture(CompletableFuture<ProxyRelayResult<T>> 
responseFuture) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
index 810534bd2..6459f8977 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/channel/GrpcClientChannel.java
@@ -47,33 +47,27 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class GrpcClientChannel extends ProxyChannel {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
-    protected static final String SEPARATOR = "@";
 
     private final GrpcChannelManager grpcChannelManager;
 
     private final AtomicReference<StreamObserver<TelemetryCommand>> 
telemetryCommandRef = new AtomicReference<>();
     private final Object telemetryWriteLock = new Object();
-    private final String group;
     private final String clientId;
 
     public GrpcClientChannel(ProxyRelayService proxyRelayService, 
GrpcChannelManager grpcChannelManager,
-        ProxyContext ctx,
-        String group, String clientId) {
-        super(proxyRelayService, null, new GrpcChannelId(group, clientId),
+        ProxyContext ctx, String clientId) {
+        super(proxyRelayService, null, new GrpcChannelId(clientId),
             ctx.getRemoteAddress(),
             ctx.getLocalAddress());
         this.grpcChannelManager = grpcChannelManager;
-        this.group = group;
         this.clientId = clientId;
     }
 
     protected static class GrpcChannelId implements ChannelId {
 
-        private final String group;
         private final String clientId;
 
-        public GrpcChannelId(String group, String clientId) {
-            this.group = group;
+        public GrpcChannelId(String clientId) {
             this.clientId = clientId;
         }
 
@@ -84,7 +78,7 @@ public class GrpcClientChannel extends ProxyChannel {
 
         @Override
         public String asLongText() {
-            return this.group + SEPARATOR + this.clientId;
+            return this.clientId;
         }
 
         @Override
@@ -95,7 +89,6 @@ public class GrpcClientChannel extends ProxyChannel {
             if (o instanceof GrpcChannelId) {
                 GrpcChannelId other = (GrpcChannelId) o;
                 return ComparisonChain.start()
-                    .compare(this.group, other.group)
                     .compare(this.clientId, other.clientId)
                     .result();
             }
@@ -184,10 +177,6 @@ public class GrpcClientChannel extends ProxyChannel {
         return CompletableFuture.completedFuture(null);
     }
 
-    public String getGroup() {
-        return group;
-    }
-
     public String getClientId() {
         return clientId;
     }
@@ -224,7 +213,6 @@ public class GrpcClientChannel extends ProxyChannel {
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
-            .add("group", group)
             .add("clientId", clientId)
             .add("remoteAddress", getRemoteAddress())
             .add("localAddress", getLocalAddress())
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index 2192014b5..8bef2c852 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -140,8 +140,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
                 case PRODUCER:
                     for (Resource topic : 
clientSettings.getPublishing().getTopicsList()) {
                         String topicName = 
GrpcConverter.getInstance().wrapResourceWithNamespace(topic);
-                        // user topic name as producer group
-                        GrpcClientChannel channel = 
this.grpcChannelManager.removeChannel(topicName, clientId);
+                        GrpcClientChannel channel = 
this.grpcChannelManager.removeChannel(clientId);
                         if (channel != null) {
                             ClientChannelInfo clientChannelInfo = new 
ClientChannelInfo(channel, clientId, languageCode, 
MQVersion.Version.V5_0_0.ordinal());
                             this.messagingProcessor.unRegisterProducer(ctx, 
topicName, clientChannelInfo);
@@ -152,7 +151,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
                 case SIMPLE_CONSUMER:
                     validateConsumerGroup(request.getGroup());
                     String consumerGroup = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
-                    GrpcClientChannel channel = 
this.grpcChannelManager.removeChannel(consumerGroup, clientId);
+                    GrpcClientChannel channel = 
this.grpcChannelManager.removeChannel(clientId);
                     if (channel != null) {
                         ClientChannelInfo clientChannelInfo = new 
ClientChannelInfo(channel, clientId, languageCode, 
MQVersion.Version.V5_0_0.ordinal());
                         this.messagingProcessor.unRegisterConsumer(ctx, 
consumerGroup, clientChannelInfo);
@@ -281,7 +280,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
         String clientId = ctx.getClientID();
         LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());
 
-        GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx, 
topicName, clientId);
+        GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx, 
clientId);
         // use topic name as producer group
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, 
clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
         this.messagingProcessor.registerProducer(ctx, topicName, 
clientChannelInfo);
@@ -296,7 +295,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
         String clientId = ctx.getClientID();
         LanguageCode languageCode = LanguageCode.valueOf(ctx.getLanguage());
 
-        GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx, 
consumerGroup, clientId);
+        GrpcClientChannel channel = this.grpcChannelManager.createChannel(ctx, 
clientId);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, 
clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
 
         this.messagingProcessor.registerConsumer(
@@ -420,7 +419,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
                 }
                 if (args[0] instanceof ClientChannelInfo) {
                     ClientChannelInfo clientChannelInfo = (ClientChannelInfo) 
args[0];
-                    grpcChannelManager.removeChannel(group, 
clientChannelInfo.getClientId());
+                    
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
                     
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
                 }
             }
@@ -437,7 +436,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
         @Override
         public void handle(ProducerGroupEvent event, String group, 
ClientChannelInfo clientChannelInfo) {
             if (event == ProducerGroupEvent.CLIENT_UNREGISTER) {
-                grpcChannelManager.removeChannel(group, 
clientChannelInfo.getClientId());
+                
grpcChannelManager.removeChannel(clientChannelInfo.getClientId());
                 
grpcClientSettingsManager.removeClientSettings(clientChannelInfo.getClientId());
             }
         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index fa2241cb3..fb31a6062 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -98,7 +98,7 @@ public class AckMessageActivity extends 
AbstractMessingActivity {
             String handleString = ackMessageEntry.getReceiptHandle();
 
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, 
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 handleString = messageReceiptHandle.getReceiptHandleStr();
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 680364072..0f33cc7aa 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -55,7 +55,7 @@ public class ChangeInvisibleDurationActivity extends 
AbstractMessingActivity {
             ReceiptHandle receiptHandle = 
ReceiptHandle.decode(request.getReceiptHandle());
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
 
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, 
request.getMessageId(), receiptHandle.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, request.getMessageId(), receiptHandle.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 receiptHandle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 49763dcdb..76fca3bba 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -124,7 +124,7 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
                                 MessageReceiptHandle messageReceiptHandle =
                                     new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
                                         messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
-                                
receiptHandleProcessor.addReceiptHandle(ctx.getClientID(), group, 
messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+                                
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
                             }
                         }
                     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index dec52f3c2..789927d69 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -48,7 +48,7 @@ public class ForwardMessageToDLQActivity extends 
AbstractMessingActivity {
 
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
             String handleString = request.getReceiptHandle();
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx.getClientID(), group, 
request.getMessageId(), request.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, request.getMessageId(), request.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 handleString = messageReceiptHandle.getReceiptHandleStr();
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
index 922528982..26d13ae18 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ClientProcessor.java
@@ -88,9 +88,9 @@ public class ClientProcessor extends AbstractProcessor {
     public ClientChannelInfo findConsumerChannel(
         ProxyContext ctx,
         String consumerGroup,
-        String clientId
+        Channel channel
     ) {
-        return 
this.serviceManager.getConsumerManager().findChannel(consumerGroup, clientId);
+        return 
this.serviceManager.getConsumerManager().findChannel(consumerGroup, channel);
     }
 
     public void unRegisterConsumer(
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 5234237a2..00ef17f9a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -246,8 +246,8 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     }
 
     @Override
-    public ClientChannelInfo findConsumerChannel(ProxyContext ctx, String 
consumerGroup, String clientId) {
-        return this.clientProcessor.findConsumerChannel(ctx, consumerGroup, 
clientId);
+    public ClientChannelInfo findConsumerChannel(ProxyContext ctx, String 
consumerGroup, Channel channel) {
+        return this.clientProcessor.findConsumerChannel(ctx, consumerGroup, 
channel);
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index e0ae71471..f366d1357 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -266,7 +266,7 @@ public interface MessagingProcessor extends 
StartAndShutdown {
     ClientChannelInfo findConsumerChannel(
         ProxyContext ctx,
         String consumerGroup,
-        String clientId
+        Channel channel
     );
 
     void unRegisterConsumer(
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 15c4385fd..d1186cc87 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.proxy.processor;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.base.Stopwatch;
+import io.netty.channel.Channel;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -104,7 +105,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                     }
                     if (args[0] instanceof ClientChannelInfo) {
                         ClientChannelInfo clientChannelInfo = 
(ClientChannelInfo) args[0];
-                        clearGroup(new 
ReceiptHandleGroupKey(clientChannelInfo.getClientId(), group));
+                        clearGroup(new 
ReceiptHandleGroupKey(clientChannelInfo.getChannel(), group));
                     }
                 }
             }
@@ -227,12 +228,12 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
     }
 
     protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
-        return 
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), 
groupKey.group, groupKey.clientId) == null;
+        return 
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), 
groupKey.group, groupKey.channel) == null;
     }
 
-    public void addReceiptHandle(String clientID, String group, String msgID, 
String receiptHandle,
+    public void addReceiptHandle(Channel channel, String group, String msgID, 
String receiptHandle,
         MessageReceiptHandle messageReceiptHandle) {
-        this.addReceiptHandle(new ReceiptHandleGroupKey(clientID, group), 
msgID, receiptHandle, messageReceiptHandle);
+        this.addReceiptHandle(new ReceiptHandleGroupKey(channel, group), 
msgID, receiptHandle, messageReceiptHandle);
     }
 
     protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, 
String receiptHandle,
@@ -244,8 +245,8 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
             k -> new ReceiptHandleGroup()).put(msgID, receiptHandle, 
messageReceiptHandle);
     }
 
-    public MessageReceiptHandle removeReceiptHandle(String clientID, String 
group, String msgID, String receiptHandle) {
-        return this.removeReceiptHandle(new ReceiptHandleGroupKey(clientID, 
group), msgID, receiptHandle);
+    public MessageReceiptHandle removeReceiptHandle(Channel channel, String 
group, String msgID, String receiptHandle) {
+        return this.removeReceiptHandle(new ReceiptHandleGroupKey(channel, 
group), msgID, receiptHandle);
     }
 
     protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey 
key, String msgID, String receiptHandle) {
@@ -299,22 +300,26 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
     }
 
     public static class ReceiptHandleGroupKey {
-        private final String clientId;
-        private final String group;
+        protected final Channel channel;
+        protected final String group;
 
-        public ReceiptHandleGroupKey(String clientId, String group) {
-            this.clientId = clientId;
+        public ReceiptHandleGroupKey(Channel channel, String group) {
+            this.channel = channel;
             this.group = group;
         }
 
-        public String getClientId() {
-            return clientId;
+        protected String getChannelId() {
+            return channel.id().asLongText();
         }
 
         public String getGroup() {
             return group;
         }
 
+        public Channel getChannel() {
+            return channel;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {
@@ -324,18 +329,18 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                 return false;
             }
             ReceiptHandleGroupKey key = (ReceiptHandleGroupKey) o;
-            return Objects.equal(clientId, key.clientId) && 
Objects.equal(group, key.group);
+            return Objects.equal(getChannelId(), key.getChannelId()) && 
Objects.equal(group, key.group);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hashCode(clientId, group);
+            return Objects.hashCode(getChannelId(), group);
         }
 
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(this)
-                .add("clientId", clientId)
+                .add("channelId", getChannelId())
                 .add("group", group)
                 .toString();
         }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
index 6aca7ac6f..8bdff6566 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroupTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -65,6 +66,84 @@ public class ReceiptHandleGroupTest extends 
InitConfigAndLoggerTest {
             .build().encode();
     }
 
+    @Test
+    public void testGetWhenComputeIfPresent() {
+        String handle1 = createHandle();
+        String handle2 = createHandle();
+        AtomicReference<MessageReceiptHandle> getHandleRef = new 
AtomicReference<>();
+
+        receiptHandleGroup.put(msgID, handle1, 
createMessageReceiptHandle(handle1, msgID));
+        CountDownLatch latch = new CountDownLatch(2);
+        Thread getThread = new Thread(() -> {
+            try {
+                latch.countDown();
+                latch.await();
+                getHandleRef.set(receiptHandleGroup.get(msgID, handle1));
+            } catch (Exception ignored) {
+            }
+        }, "getThread");
+        Thread computeThread = new Thread(() -> {
+            try {
+                receiptHandleGroup.computeIfPresent(msgID, handle1, 
messageReceiptHandle -> {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                    } catch (Exception ignored) {
+                    }
+                    messageReceiptHandle.updateReceiptHandle(handle2);
+                    return 
FutureUtils.addExecutor(CompletableFuture.completedFuture(messageReceiptHandle),
 Executors.newCachedThreadPool());
+                });
+            } catch (Exception ignored) {
+            }
+        }, "computeThread");
+        getThread.start();
+        computeThread.start();
+
+        await().atMost(Duration.ofSeconds(1)).until(() -> getHandleRef.get() 
!= null);
+        assertEquals(handle2, getHandleRef.get().getReceiptHandleStr());
+        assertFalse(receiptHandleGroup.isEmpty());
+    }
+
+    @Test
+    public void testGetWhenComputeIfPresentReturnNull() {
+        String handle1 = createHandle();
+        AtomicBoolean getCalled = new AtomicBoolean(false);
+        AtomicReference<MessageReceiptHandle> getHandleRef = new 
AtomicReference<>();
+
+        receiptHandleGroup.put(msgID, handle1, 
createMessageReceiptHandle(handle1, msgID));
+        CountDownLatch latch = new CountDownLatch(2);
+        Thread getThread = new Thread(() -> {
+            try {
+                latch.countDown();
+                latch.await();
+                getHandleRef.set(receiptHandleGroup.get(msgID, handle1));
+                getCalled.set(true);
+            } catch (Exception ignored) {
+            }
+        }, "getThread");
+        Thread computeThread = new Thread(() -> {
+            try {
+                receiptHandleGroup.computeIfPresent(msgID, handle1, 
messageReceiptHandle -> {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                    } catch (Exception ignored) {
+                    }
+                    return 
FutureUtils.addExecutor(CompletableFuture.completedFuture(null), 
Executors.newCachedThreadPool());
+                });
+            } catch (Exception ignored) {
+            }
+        }, "computeThread");
+        getThread.start();
+        computeThread.start();
+
+        await().atMost(Duration.ofSeconds(1)).until(getCalled::get);
+        assertNull(getHandleRef.get());
+        assertTrue(receiptHandleGroup.isEmpty());
+    }
+
+
+
     @Test
     public void testRemoveWhenComputeIfPresent() {
         String handle1 = createHandle();
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index ea045774f..bfff5ee59 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -208,7 +208,6 @@ public class ClientActivityTest extends BaseActivityTest {
         GrpcClientChannel channel = (GrpcClientChannel) 
clientChannelInfo.getChannel();
         assertEquals(REMOTE_ADDR, channel.getRemoteAddress());
         assertEquals(LOCAL_ADDR, channel.getLocalAddress());
-        assertEquals(group, channel.getGroup());
     }
 
     @Test
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index 012649465..a861e8c13 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends 
BaseActivityTest {
         when(this.messagingProcessor.changeInvisibleTime(
             any(), receiptHandleCaptor.capture(), anyString(), anyString(), 
anyString(), invisibleTimeArgumentCaptor.capture()
         )).thenReturn(CompletableFuture.completedFuture(ackResult));
-        when(receiptHandleProcessor.removeReceiptHandle(anyString(), 
anyString(), anyString(), anyString()))
+        when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(), 
anyString(), anyString()))
             .thenReturn(new MessageReceiptHandle("group", "topic", 0, 
savedHandleStr, "msgId", 0, 0));
 
         ChangeInvisibleDurationResponse response = 
this.changeInvisibleDurationActivity.changeInvisibleDuration(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index cd3c48e1a..5603d6958 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends 
BaseActivityTest {
             
.thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
 "")));
 
         String savedHandleStr = buildReceiptHandle("topic", 
System.currentTimeMillis(),3000);
-        when(receiptHandleProcessor.removeReceiptHandle(anyString(), 
anyString(), anyString(), anyString()))
+        when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(), 
anyString(), anyString()))
             .thenReturn(new MessageReceiptHandle("group", "topic", 0, 
savedHandleStr, "msgId", 0, 0));
 
         ForwardMessageToDeadLetterQueueResponse response = 
this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index 93ab0210c..cc294e859 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -17,6 +17,19 @@
 
 package org.apache.rocketmq.proxy.processor;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelProgressivePromise;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoop;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.net.SocketAddress;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -82,6 +95,7 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
             .commitLogOffset(0L)
             .build().encode();
         PROXY_CONTEXT.withVal(ContextVariable.CLIENT_ID, "channel-id");
+        PROXY_CONTEXT.withVal(ContextVariable.CHANNEL, new MockChannel());
         receiptHandleProcessor = new 
ReceiptHandleProcessor(messagingProcessor);
         
Mockito.doNothing().when(messagingProcessor).registerConsumerListener(Mockito.any(ConsumerIdsChangeListener.class));
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, receiptHandle, MESSAGE_ID, OFFSET,
@@ -90,10 +104,10 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
     @Test
     public void testAddReceiptHandle() {
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(new
 SubscriptionGroupConfig());
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
@@ -103,11 +117,11 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testRenewReceiptHandle() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         long newInvisibleTime = 2000L;
         ReceiptHandle newReceiptHandleClass = ReceiptHandle.builder()
             .startOffset(0L)
@@ -140,9 +154,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testRenewExceedMaxRenewTimes() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, 
"error"));
@@ -166,9 +180,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
     @Test
     public void testRenewWithInvalidHandle() {
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -190,9 +204,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testRenewWithErrorThenOK() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
 
         AtomicInteger count = new AtomicInteger(0);
         List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -261,9 +275,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
@@ -293,9 +307,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
             .thenReturn(CompletableFuture.completedFuture(new AckResult()));
@@ -329,11 +343,11 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
             .build().encode();
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
-        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channelId))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+        Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.anyString(),
@@ -342,9 +356,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
     @Test
     public void testRemoveReceiptHandle() {
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
-        receiptHandleProcessor.removeReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle);
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.removeReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleProcessor.scheduleRenewTask();
@@ -355,9 +369,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
 
     @Test
     public void testClearGroup() {
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
-        receiptHandleProcessor.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channelId, GROUP));
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
         
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleProcessor.scheduleRenewTask();
@@ -370,9 +384,241 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     public void testClientOffline() {
         ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = 
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
         Mockito.verify(messagingProcessor, 
Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
-        String channelId = PROXY_CONTEXT.getVal(ContextVariable.CLIENT_ID);
-        receiptHandleProcessor.addReceiptHandle(channelId, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
-        
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, 
GROUP, new ClientChannelInfo(null, channelId, LanguageCode.JAVA, 0));
+        Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, 
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
         assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
     }
+
+    class MockChannel implements Channel {
+        @Override
+        public ChannelId id() {
+            return new ChannelId() {
+                @Override
+                public String asShortText() {
+                    return "short";
+                }
+
+                @Override
+                public String asLongText() {
+                    return "long";
+                }
+
+                @Override
+                public int compareTo(ChannelId o) {
+                    return 1;
+                }
+            };
+        }
+
+        @Override
+        public EventLoop eventLoop() {
+            return null;
+        }
+
+        @Override
+        public Channel parent() {
+            return null;
+        }
+
+        @Override
+        public ChannelConfig config() {
+            return null;
+        }
+
+        @Override
+        public boolean isOpen() {
+            return false;
+        }
+
+        @Override
+        public boolean isRegistered() {
+            return false;
+        }
+
+        @Override
+        public boolean isActive() {
+            return false;
+        }
+
+        @Override
+        public ChannelMetadata metadata() {
+            return null;
+        }
+
+        @Override
+        public SocketAddress localAddress() {
+            return null;
+        }
+
+        @Override
+        public SocketAddress remoteAddress() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture closeFuture() {
+            return null;
+        }
+
+        @Override
+        public boolean isWritable() {
+            return false;
+        }
+
+        @Override
+        public long bytesBeforeUnwritable() {
+            return 0;
+        }
+
+        @Override
+        public long bytesBeforeWritable() {
+            return 0;
+        }
+
+        @Override
+        public Unsafe unsafe() {
+            return null;
+        }
+
+        @Override
+        public ChannelPipeline pipeline() {
+            return null;
+        }
+
+        @Override
+        public ByteBufAllocator alloc() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture disconnect() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture close() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture deregister() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise 
promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress, ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture disconnect(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture close(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture deregister(ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public Channel read() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture write(Object msg) {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture write(Object msg, ChannelPromise promise) {
+            return null;
+        }
+
+        @Override
+        public Channel flush() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) 
{
+            return null;
+        }
+
+        @Override
+        public ChannelFuture writeAndFlush(Object msg) {
+            return null;
+        }
+
+        @Override
+        public ChannelPromise newPromise() {
+            return null;
+        }
+
+        @Override
+        public ChannelProgressivePromise newProgressivePromise() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture newSucceededFuture() {
+            return null;
+        }
+
+        @Override
+        public ChannelFuture newFailedFuture(Throwable cause) {
+            return null;
+        }
+
+        @Override
+        public ChannelPromise voidPromise() {
+            return null;
+        }
+
+        @Override
+        public <T> Attribute<T> attr(AttributeKey<T> key) {
+            return null;
+        }
+
+        @Override
+        public <T> boolean hasAttr(AttributeKey<T> key) {
+            return false;
+        }
+
+        @Override
+        public int compareTo(Channel o) {
+            return 1;
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
index c365aa9d0..0b5542fbe 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/ProxyClientRemotingProcessorTest.java
@@ -75,7 +75,7 @@ public class ProxyClientRemotingProcessorTest {
                 proxyRelayResultFuture));
 
         GrpcClientChannel grpcClientChannel = new 
GrpcClientChannel(proxyRelayService, null,
-            
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"),
 "group", "clientId");
+            
ProxyContext.create().setRemoteAddress("127.0.0.1:8888").setLocalAddress("127.0.0.1:10911"),
 "clientId");
         when(producerManager.getAvailableChannel(anyString()))
             .thenReturn(grpcClientChannel);
 

Reply via email to