This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a325d144b2 [ISSUE #6858] passing through ProxyContext for future 
expansion (#6859)
a325d144b2 is described below

commit a325d144b24a1acdc92a5ac308865080532325d9
Author: lk <[email protected]>
AuthorDate: Tue Jun 6 17:07:06 2023 +0800

    [ISSUE #6858] passing through ProxyContext for future expansion (#6859)
---
 .../proxy/grpc/v2/client/ClientActivity.java       |  2 +-
 .../proxy/grpc/v2/consumer/AckMessageActivity.java |  2 +-
 .../consumer/ChangeInvisibleDurationActivity.java  |  2 +-
 .../grpc/v2/consumer/ReceiveMessageActivity.java   |  2 +-
 .../v2/producer/ForwardMessageToDLQActivity.java   |  2 +-
 .../proxy/grpc/v2/route/RouteActivity.java         |  2 +-
 .../proxy/processor/ConsumerProcessor.java         | 20 ++++++------
 .../proxy/processor/DefaultMessagingProcessor.java |  4 +--
 .../proxy/processor/ProducerProcessor.java         |  9 ++---
 .../proxy/processor/ReceiptHandleProcessor.java    | 14 ++++----
 .../proxy/processor/TransactionProcessor.java      |  3 +-
 .../remoting/activity/SendMessageActivity.java     |  2 +-
 .../proxy/service/ClusterServiceManager.java       |  3 +-
 .../service/message/ClusterMessageService.java     | 20 ++++++------
 .../service/metadata/ClusterMetadataService.java   |  7 ++--
 .../service/metadata/LocalMetadataService.java     |  5 +--
 .../proxy/service/metadata/MetadataService.java    |  5 +--
 .../service/relay/AbstractProxyRelayService.java   |  1 +
 .../service/route/ClusterTopicRouteService.java    | 17 +++++-----
 .../service/route/LocalTopicRouteService.java      | 13 ++++----
 .../proxy/service/route/TopicRouteService.java     | 11 ++++---
 .../sysmessage/AbstractSystemMessageSyncer.java    |  3 +-
 .../transaction/AbstractTransactionService.java    |  8 ++---
 .../transaction/ClusterTransactionService.java     | 21 ++++++------
 .../transaction/LocalTransactionService.java       |  9 ++---
 .../service/transaction/TransactionService.java    | 14 ++++----
 .../proxy/grpc/v2/client/ClientActivityTest.java   |  4 +--
 .../ChangeInvisibleDurationActivityTest.java       |  2 +-
 .../producer/ForwardMessageToDLQActivityTest.java  |  2 +-
 .../proxy/grpc/v2/route/RouteActivityTest.java     |  2 +-
 .../proxy/processor/ConsumerProcessorTest.java     | 15 +++++----
 .../proxy/processor/ProducerProcessorTest.java     |  4 ++-
 .../processor/ReceiptHandleProcessorTest.java      | 38 +++++++++++-----------
 .../proxy/processor/TransactionProcessorTest.java  |  2 +-
 .../remoting/activity/SendMessageActivityTest.java |  2 +-
 .../rocketmq/proxy/service/BaseServiceTest.java    |  7 ++--
 .../service/message/ClusterMessageServiceTest.java |  3 +-
 .../metadata/ClusterMetadataServiceTest.java       | 11 ++++---
 .../route/ClusterTopicRouteServiceTest.java        |  9 +++--
 .../service/route/LocalTopicRouteServiceTest.java  |  7 ++--
 .../service/sysmessage/HeartbeatSyncerTest.java    |  2 +-
 .../AbstractTransactionServiceTest.java            | 14 +++++---
 .../transaction/ClusterTransactionServiceTest.java | 26 ++++++++-------
 43 files changed, 193 insertions(+), 158 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index de8fba4a63..a60228eb9f 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -287,7 +287,7 @@ public class ClientActivity extends AbstractMessingActivity 
{
         // use topic name as producer group
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, 
clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
         this.messagingProcessor.registerProducer(ctx, topicName, 
clientChannelInfo);
-        TopicMessageType topicMessageType = 
this.messagingProcessor.getMetadataService().getTopicMessageType(topicName);
+        TopicMessageType topicMessageType = 
this.messagingProcessor.getMetadataService().getTopicMessageType(ctx, 
topicName);
         if (TopicMessageType.TRANSACTION.equals(topicMessageType)) {
             this.messagingProcessor.addTransactionSubscription(ctx, topicName, 
topicName);
         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index fb31a60624..993f069b94 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -98,7 +98,7 @@ public class AckMessageActivity extends 
AbstractMessingActivity {
             String handleString = ackMessageEntry.getReceiptHandle();
 
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 handleString = messageReceiptHandle.getReceiptHandleStr();
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 0f33cc7aa7..9b7e947e0b 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -55,7 +55,7 @@ public class ChangeInvisibleDurationActivity extends 
AbstractMessingActivity {
             ReceiptHandle receiptHandle = 
ReceiptHandle.decode(request.getReceiptHandle());
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
 
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, request.getMessageId(), receiptHandle.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
request.getMessageId(), receiptHandle.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 receiptHandle = 
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 9df4101f73..22a149004c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -144,7 +144,7 @@ public class ReceiveMessageActivity extends 
AbstractMessingActivity {
                                     MessageReceiptHandle messageReceiptHandle =
                                         new MessageReceiptHandle(group, topic, 
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
                                             messageExt.getQueueOffset(), 
messageExt.getReconsumeTimes());
-                                    
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+                                    
receiptHandleProcessor.addReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), 
receiptHandle, messageReceiptHandle);
                                 }
                             }
                         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index 789927d693..6b5c5c7e07 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -48,7 +48,7 @@ public class ForwardMessageToDLQActivity extends 
AbstractMessingActivity {
 
             String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
             String handleString = request.getReceiptHandle();
-            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
 group, request.getMessageId(), request.getReceiptHandle());
+            MessageReceiptHandle messageReceiptHandle = 
receiptHandleProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
request.getMessageId(), request.getReceiptHandle());
             if (messageReceiptHandle != null) {
                 handleString = messageReceiptHandle.getReceiptHandleStr();
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index eb7385f874..c5d485691b 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -71,7 +71,7 @@ public class RouteActivity extends AbstractMessingActivity {
             List<MessageQueue> messageQueueList = new ArrayList<>();
             Map<String, Map<Long, Broker>> brokerMap = 
buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
 
-            TopicMessageType topicMessageType = 
messagingProcessor.getMetadataService().getTopicMessageType(topicName);
+            TopicMessageType topicMessageType = 
messagingProcessor.getMetadataService().getTopicMessageType(ctx, topicName);
             for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
                 String brokerName = queueData.getBrokerName();
                 Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index d67f4b855d..c860ee8a1a 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -87,7 +87,7 @@ public class ConsumerProcessor extends AbstractProcessor {
     ) {
         CompletableFuture<PopResult> future = new CompletableFuture<>();
         try {
-            AddressableMessageQueue messageQueue = queueSelector.select(ctx, 
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
+            AddressableMessageQueue messageQueue = queueSelector.select(ctx, 
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, 
topic));
             if (messageQueue == null) {
                 throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no 
readable queue");
             }
@@ -287,7 +287,7 @@ public class ConsumerProcessor extends AbstractProcessor {
         CompletableFuture<PullResult> future = new CompletableFuture<>();
         try {
             AddressableMessageQueue addressableMessageQueue = 
serviceManager.getTopicRouteService()
-                .buildAddressableMessageQueue(messageQueue);
+                .buildAddressableMessageQueue(ctx, messageQueue);
             PullMessageRequestHeader requestHeader = new 
PullMessageRequestHeader();
             requestHeader.setConsumerGroup(consumerGroup);
             requestHeader.setTopic(addressableMessageQueue.getTopic());
@@ -311,7 +311,7 @@ public class ConsumerProcessor extends AbstractProcessor {
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
             AddressableMessageQueue addressableMessageQueue = 
serviceManager.getTopicRouteService()
-                .buildAddressableMessageQueue(messageQueue);
+                .buildAddressableMessageQueue(ctx, messageQueue);
             UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
             requestHeader.setConsumerGroup(consumerGroup);
             requestHeader.setTopic(addressableMessageQueue.getTopic());
@@ -329,7 +329,7 @@ public class ConsumerProcessor extends AbstractProcessor {
         CompletableFuture<Long> future = new CompletableFuture<>();
         try {
             AddressableMessageQueue addressableMessageQueue = 
serviceManager.getTopicRouteService()
-                .buildAddressableMessageQueue(messageQueue);
+                .buildAddressableMessageQueue(ctx, messageQueue);
             QueryConsumerOffsetRequestHeader requestHeader = new 
QueryConsumerOffsetRequestHeader();
             requestHeader.setConsumerGroup(consumerGroup);
             requestHeader.setTopic(addressableMessageQueue.getTopic());
@@ -345,7 +345,7 @@ public class ConsumerProcessor extends AbstractProcessor {
         String consumerGroup, String clientId, long timeoutMillis) {
         CompletableFuture<Set<MessageQueue>> future = new 
CompletableFuture<>();
         Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
-        Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(mqSet);
+        Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(ctx, mqSet);
         Map<String, List<AddressableMessageQueue>> messageQueueSetMap = 
buildAddressableMapByBrokerName(addressableMessageQueueSet);
         List<CompletableFuture<Void>> futureList = new ArrayList<>();
         messageQueueSetMap.forEach((k, v) -> {
@@ -370,7 +370,7 @@ public class ConsumerProcessor extends AbstractProcessor {
     public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, 
Set<MessageQueue> mqSet,
         String consumerGroup, String clientId, long timeoutMillis) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(mqSet);
+        Set<AddressableMessageQueue> addressableMessageQueueSet = 
buildAddressableSet(ctx, mqSet);
         Map<String, List<AddressableMessageQueue>> messageQueueSetMap = 
buildAddressableMapByBrokerName(addressableMessageQueueSet);
         List<CompletableFuture<Void>> futureList = new ArrayList<>();
         messageQueueSetMap.forEach((k, v) -> {
@@ -394,7 +394,7 @@ public class ConsumerProcessor extends AbstractProcessor {
         CompletableFuture<Long> future = new CompletableFuture<>();
         try {
             AddressableMessageQueue addressableMessageQueue = 
serviceManager.getTopicRouteService()
-                .buildAddressableMessageQueue(messageQueue);
+                .buildAddressableMessageQueue(ctx, messageQueue);
             GetMaxOffsetRequestHeader requestHeader = new 
GetMaxOffsetRequestHeader();
             requestHeader.setTopic(addressableMessageQueue.getTopic());
             requestHeader.setQueueId(addressableMessageQueue.getQueueId());
@@ -409,7 +409,7 @@ public class ConsumerProcessor extends AbstractProcessor {
         CompletableFuture<Long> future = new CompletableFuture<>();
         try {
             AddressableMessageQueue addressableMessageQueue = 
serviceManager.getTopicRouteService()
-                .buildAddressableMessageQueue(messageQueue);
+                .buildAddressableMessageQueue(ctx, messageQueue);
             GetMinOffsetRequestHeader requestHeader = new 
GetMinOffsetRequestHeader();
             requestHeader.setTopic(addressableMessageQueue.getTopic());
             requestHeader.setQueueId(addressableMessageQueue.getQueueId());
@@ -420,10 +420,10 @@ public class ConsumerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
-    protected Set<AddressableMessageQueue> 
buildAddressableSet(Set<MessageQueue> mqSet) {
+    protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext 
ctx, Set<MessageQueue> mqSet) {
         return mqSet.stream().map(mq -> {
             try {
-                return 
serviceManager.getTopicRouteService().buildAddressableMessageQueue(mq);
+                return 
serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq);
             } catch (Exception e) {
                 return null;
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index bfadc0d3e0..81d2b9df35 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -127,13 +127,13 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
 
     @Override
     public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext 
ctx, String consumerGroupName) {
-        return 
this.serviceManager.getMetadataService().getSubscriptionGroupConfig(consumerGroupName);
+        return 
this.serviceManager.getMetadataService().getSubscriptionGroupConfig(ctx, 
consumerGroupName);
     }
 
     @Override
     public ProxyTopicRouteData getTopicRouteDataForProxy(ProxyContext ctx, 
List<Address> requestHostAndPortList,
         String topicName) throws Exception {
-        return 
this.serviceManager.getTopicRouteService().getTopicRouteForProxy(requestHostAndPortList,
 topicName);
+        return 
this.serviceManager.getTopicRouteService().getTopicRouteForProxy(ctx, 
requestHostAndPortList, topicName);
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 749f9da2be..0d0c621686 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -73,14 +73,14 @@ public class ProducerProcessor extends AbstractProcessor {
                 if (topicMessageTypeValidator != null) {
                     // Do not check retry or dlq topic
                     if (!NamespaceUtil.isRetryTopic(topic) && 
!NamespaceUtil.isDLQTopic(topic)) {
-                        TopicMessageType topicMessageType = 
serviceManager.getMetadataService().getTopicMessageType(topic);
+                        TopicMessageType topicMessageType = 
serviceManager.getMetadataService().getTopicMessageType(ctx, topic);
                         TopicMessageType messageType = 
TopicMessageType.parseFromMessageProperty(message.getProperties());
                         topicMessageTypeValidator.validate(topicMessageType, 
messageType);
                     }
                 }
             }
             AddressableMessageQueue messageQueue = queueSelector.select(ctx,
-                
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
+                
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, 
topic));
             if (messageQueue == null) {
                 throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no 
writable queue");
             }
@@ -102,7 +102,7 @@ public class ProducerProcessor extends AbstractProcessor {
                         if 
(SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
                             tranType == 
MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
                             
StringUtils.isNotBlank(sendResult.getTransactionId())) {
-                            fillTransactionData(producerGroup, messageQueue, 
sendResult, messageList);
+                            fillTransactionData(ctx, producerGroup, 
messageQueue, sendResult, messageList);
                         }
                     }
                     return sendResultList;
@@ -113,7 +113,7 @@ public class ProducerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
-    protected void fillTransactionData(String producerGroup, 
AddressableMessageQueue messageQueue, SendResult sendResult, List<Message> 
messageList) {
+    protected void fillTransactionData(ProxyContext ctx, String producerGroup, 
AddressableMessageQueue messageQueue, SendResult sendResult, List<Message> 
messageList) {
         try {
             MessageId id;
             if (sendResult.getOffsetMsgId() != null) {
@@ -122,6 +122,7 @@ public class ProducerProcessor extends AbstractProcessor {
                 id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
             }
             
this.serviceManager.getTransactionService().addTransactionDataByBrokerName(
+                ctx,
                 messageQueue.getBrokerName(),
                 producerGroup,
                 sendResult.getQueueOffset(),
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index c903220bbe..7fe97db798 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -201,7 +201,7 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
                 });
             } else {
                 SubscriptionGroupConfig subscriptionGroupConfig =
-                    
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(messageReceiptHandle.getGroup());
+                    
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, 
messageReceiptHandle.getGroup());
                 if (subscriptionGroupConfig == null) {
                     log.error("group's subscriptionGroupConfig is null when 
renew. handle: {}", messageReceiptHandle);
                     return CompletableFuture.completedFuture(null);
@@ -240,12 +240,12 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
         return 
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), 
groupKey.group, groupKey.channel) == null;
     }
 
-    public void addReceiptHandle(Channel channel, String group, String msgID, 
String receiptHandle,
+    public void addReceiptHandle(ProxyContext ctx, Channel channel, String 
group, String msgID, String receiptHandle,
         MessageReceiptHandle messageReceiptHandle) {
-        this.addReceiptHandle(new ReceiptHandleGroupKey(channel, group), 
msgID, receiptHandle, messageReceiptHandle);
+        this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), 
msgID, receiptHandle, messageReceiptHandle);
     }
 
-    protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, 
String receiptHandle,
+    protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey 
key, String msgID, String receiptHandle,
         MessageReceiptHandle messageReceiptHandle) {
         if (key == null) {
             return;
@@ -254,11 +254,11 @@ public class ReceiptHandleProcessor extends 
AbstractStartAndShutdown {
             k -> new ReceiptHandleGroup()).put(msgID, receiptHandle, 
messageReceiptHandle);
     }
 
-    public MessageReceiptHandle removeReceiptHandle(Channel channel, String 
group, String msgID, String receiptHandle) {
-        return this.removeReceiptHandle(new ReceiptHandleGroupKey(channel, 
group), msgID, receiptHandle);
+    public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel 
channel, String group, String msgID, String receiptHandle) {
+        return this.removeReceiptHandle(ctx, new 
ReceiptHandleGroupKey(channel, group), msgID, receiptHandle);
     }
 
-    protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey 
key, String msgID, String receiptHandle) {
+    protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, 
ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
         if (key == null) {
             return null;
         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
index 3b284cd056..c0ba255f54 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
@@ -36,6 +36,7 @@ public class TransactionProcessor extends AbstractProcessor {
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
             EndTransactionRequestData headerData = 
serviceManager.getTransactionService().genEndTransactionRequestHeader(
+                ctx,
                 producerGroup,
                 buildCommitOrRollback(transactionStatus),
                 fromTransactionCheck,
@@ -70,6 +71,6 @@ public class TransactionProcessor extends AbstractProcessor {
     }
 
     public void addTransactionSubscription(ProxyContext ctx, String 
producerGroup, String topic) {
-        
this.serviceManager.getTransactionService().addTransactionSubscription(producerGroup,
 topic);
+        
this.serviceManager.getTransactionService().addTransactionSubscription(ctx, 
producerGroup, topic);
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 618d458743..17af0fdcb3 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -70,7 +70,7 @@ public class SendMessageActivity extends 
AbstractRemotingActivity {
             if (topicMessageTypeValidator != null) {
                 // Do not check retry or dlq topic
                 if (!NamespaceUtil.isRetryTopic(topic) && 
!NamespaceUtil.isDLQTopic(topic)) {
-                    TopicMessageType topicMessageType = 
messagingProcessor.getMetadataService().getTopicMessageType(topic);
+                    TopicMessageType topicMessageType = 
messagingProcessor.getMetadataService().getTopicMessageType(context, topic);
                     topicMessageTypeValidator.validate(topicMessageType, 
messageType);
                 }
             }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 20beeb5666..95cc4d1497 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.admin.AdminService;
@@ -191,7 +192,7 @@ public class ClusterServiceManager extends 
AbstractStartAndShutdown implements S
         @Override
         public void handle(ProducerGroupEvent event, String group, 
ClientChannelInfo clientChannelInfo) {
             if (event == ProducerGroupEvent.GROUP_UNREGISTER) {
-                getTransactionService().unSubscribeAllTransactionTopic(group);
+                
getTransactionService().unSubscribeAllTransactionTopic(ProxyContext.createForInner(this.getClass()),
 group);
             }
         }
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index 7150967d45..9f163f1b98 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -80,7 +80,7 @@ public class ClusterMessageService implements MessageService {
     public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext 
ctx, ReceiptHandle handle, String messageId,
         ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
         return this.mqClientAPIFactory.getClient().sendMessageBackAsync(
-            this.resolveBrokerAddrInReceiptHandle(handle),
+            this.resolveBrokerAddrInReceiptHandle(ctx, handle),
             requestHeader,
             timeoutMillis
         );
@@ -93,7 +93,7 @@ public class ClusterMessageService implements MessageService {
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
             this.mqClientAPIFactory.getClient().endTransactionOneway(
-                this.resolveBrokerAddr(brokerName),
+                this.resolveBrokerAddr(ctx, brokerName),
                 requestHeader,
                 "end transaction from proxy",
                 timeoutMillis
@@ -120,7 +120,7 @@ public class ClusterMessageService implements 
MessageService {
     public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle, String messageId,
         ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
         return this.mqClientAPIFactory.getClient().changeInvisibleTimeAsync(
-            this.resolveBrokerAddrInReceiptHandle(handle),
+            this.resolveBrokerAddrInReceiptHandle(ctx, handle),
             handle.getBrokerName(),
             requestHeader,
             timeoutMillis
@@ -131,7 +131,7 @@ public class ClusterMessageService implements 
MessageService {
     public CompletableFuture<AckResult> ackMessage(ProxyContext ctx, 
ReceiptHandle handle, String messageId,
         AckMessageRequestHeader requestHeader, long timeoutMillis) {
         return this.mqClientAPIFactory.getClient().ackMessageAsync(
-            this.resolveBrokerAddrInReceiptHandle(handle),
+            this.resolveBrokerAddrInReceiptHandle(ctx, handle),
             requestHeader,
             timeoutMillis
         );
@@ -211,7 +211,7 @@ public class ClusterMessageService implements 
MessageService {
     public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String 
brokerName, RemotingCommand request,
         long timeoutMillis) {
         try {
-            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            String brokerAddress = topicRouteService.getBrokerAddr(ctx, 
brokerName);
             return mqClientAPIFactory.getClient().invoke(brokerAddress, 
request, timeoutMillis);
         } catch (Throwable t) {
             return FutureUtils.completeExceptionally(t);
@@ -222,24 +222,24 @@ public class ClusterMessageService implements 
MessageService {
     public CompletableFuture<Void> requestOneway(ProxyContext ctx, String 
brokerName, RemotingCommand request,
         long timeoutMillis) {
         try {
-            String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+            String brokerAddress = topicRouteService.getBrokerAddr(ctx, 
brokerName);
             return mqClientAPIFactory.getClient().invokeOneway(brokerAddress, 
request, timeoutMillis);
         } catch (Throwable t) {
             return FutureUtils.completeExceptionally(t);
         }
     }
 
-    protected String resolveBrokerAddrInReceiptHandle(ReceiptHandle handle) {
+    protected String resolveBrokerAddrInReceiptHandle(ProxyContext ctx, 
ReceiptHandle handle) {
         try {
-            return 
this.topicRouteService.getBrokerAddr(handle.getBrokerName());
+            return this.topicRouteService.getBrokerAddr(ctx, 
handle.getBrokerName());
         } catch (Throwable t) {
             throw new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "cannot find broker " 
+ handle.getBrokerName(), t);
         }
     }
 
-    protected String resolveBrokerAddr(String brokerName) {
+    protected String resolveBrokerAddr(ProxyContext ctx, String brokerName) {
         try {
-            return this.topicRouteService.getBrokerAddr(brokerName);
+            return this.topicRouteService.getBrokerAddr(ctx, brokerName);
         } catch (Throwable t) {
             throw new ProxyException(ProxyExceptionCode.INVALID_BROKER_NAME, 
"cannot find broker " + brokerName, t);
         }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index 7934d3c860..bc9582ad81 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -83,7 +84,7 @@ public class ClusterMetadataService extends 
AbstractStartAndShutdown implements
     }
 
     @Override
-    public TopicMessageType getTopicMessageType(String topic) {
+    public TopicMessageType getTopicMessageType(ProxyContext ctx, String 
topic) {
         TopicConfigAndQueueMapping topicConfigAndQueueMapping;
         try {
             topicConfigAndQueueMapping = topicConfigCache.get(topic);
@@ -97,7 +98,7 @@ public class ClusterMetadataService extends 
AbstractStartAndShutdown implements
     }
 
     @Override
-    public SubscriptionGroupConfig getSubscriptionGroupConfig(String group) {
+    public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext 
ctx, String group) {
         SubscriptionGroupConfig config;
         try {
             config = this.subscriptionGroupConfigCache.get(group);
@@ -158,7 +159,7 @@ public class ClusterMetadataService extends 
AbstractStartAndShutdown implements
 
     protected Optional<BrokerData> findOneBroker(String topic) throws 
Exception {
         try {
-            return 
topicRouteService.getAllMessageQueueView(topic).getTopicRouteData().getBrokerDatas().stream().findAny();
+            return 
topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
 topic).getTopicRouteData().getBrokerDatas().stream().findAny();
         } catch (Exception e) {
             if (TopicRouteHelper.isTopicNotExistError(e)) {
                 return Optional.empty();
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
index bc1d03e74b..7f3c041f25 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.proxy.service.metadata;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
 public class LocalMetadataService implements MetadataService {
@@ -30,7 +31,7 @@ public class LocalMetadataService implements MetadataService {
     }
 
     @Override
-    public TopicMessageType getTopicMessageType(String topic) {
+    public TopicMessageType getTopicMessageType(ProxyContext ctx, String 
topic) {
         TopicConfig topicConfig = 
brokerController.getTopicConfigManager().selectTopicConfig(topic);
         if (topicConfig == null) {
             return TopicMessageType.UNSPECIFIED;
@@ -39,7 +40,7 @@ public class LocalMetadataService implements MetadataService {
     }
 
     @Override
-    public SubscriptionGroupConfig getSubscriptionGroupConfig(String group) {
+    public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext 
ctx, String group) {
         return 
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group);
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
index d5e38f145b..3ee0f3eacd 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
@@ -18,11 +18,12 @@
 package org.apache.rocketmq.proxy.service.metadata;
 
 import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 
 public interface MetadataService {
 
-    TopicMessageType getTopicMessageType(String topic);
+    TopicMessageType getTopicMessageType(ProxyContext ctx, String topic);
 
-    SubscriptionGroupConfig getSubscriptionGroupConfig(String group);
+    SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx, 
String group);
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
index ed68d1d3ae..08f00bd83c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
@@ -43,6 +43,7 @@ public abstract class AbstractProxyRelayService implements 
ProxyRelayService {
         CompletableFuture<ProxyRelayResult<Void>> future = new 
CompletableFuture<>();
         String group = 
messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
         TransactionData transactionData = 
transactionService.addTransactionDataByBrokerAddr(
+            context,
             command.getExtFields().get(ProxyUtils.BROKER_ADDR),
             group,
             header.getTranStateTableOffset(),
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
index 31e1f94c55..fb97002df7 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
@@ -21,6 +21,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 
@@ -31,14 +32,14 @@ public class ClusterTopicRouteService extends 
TopicRouteService {
     }
 
     @Override
-    public MessageQueueView getCurrentMessageQueueView(String topicName) 
throws Exception {
-        return getAllMessageQueueView(topicName);
+    public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, 
String topicName) throws Exception {
+        return getAllMessageQueueView(ctx, topicName);
     }
 
     @Override
-    public ProxyTopicRouteData getTopicRouteForProxy(List<Address> 
requestHostAndPortList,
+    public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, 
List<Address> requestHostAndPortList,
         String topicName) throws Exception {
-        TopicRouteData topicRouteData = 
getAllMessageQueueView(topicName).getTopicRouteData();
+        TopicRouteData topicRouteData = getAllMessageQueueView(ctx, 
topicName).getTopicRouteData();
 
         ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
         proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());
@@ -57,8 +58,8 @@ public class ClusterTopicRouteService extends 
TopicRouteService {
     }
 
     @Override
-    public String getBrokerAddr(String brokerName) throws Exception {
-        List<BrokerData> brokerDataList = 
getAllMessageQueueView(brokerName).getTopicRouteData().getBrokerDatas();
+    public String getBrokerAddr(ProxyContext ctx, String brokerName) throws 
Exception {
+        List<BrokerData> brokerDataList = getAllMessageQueueView(ctx, 
brokerName).getTopicRouteData().getBrokerDatas();
         if (brokerDataList.isEmpty()) {
             return null;
         }
@@ -66,8 +67,8 @@ public class ClusterTopicRouteService extends 
TopicRouteService {
     }
 
     @Override
-    public AddressableMessageQueue buildAddressableMessageQueue(MessageQueue 
messageQueue) throws Exception {
-        String brokerAddress = getBrokerAddr(messageQueue.getBrokerName());
+    public AddressableMessageQueue buildAddressableMessageQueue(ProxyContext 
ctx, MessageQueue messageQueue) throws Exception {
+        String brokerAddress = getBrokerAddr(ctx, 
messageQueue.getBrokerName());
         return new AddressableMessageQueue(messageQueue, brokerAddress);
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
index 3ac7ae75b9..d67b68f38e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -51,15 +52,15 @@ public class LocalTopicRouteService extends 
TopicRouteService {
     }
 
     @Override
-    public MessageQueueView getCurrentMessageQueueView(String topic) throws 
Exception {
+    public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx, 
String topic) throws Exception {
         TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
         return new MessageQueueView(topic, toTopicRouteData(topicConfig));
     }
 
     @Override
-    public ProxyTopicRouteData getTopicRouteForProxy(List<Address> 
requestHostAndPortList,
+    public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx, 
List<Address> requestHostAndPortList,
         String topicName) throws Exception {
-        MessageQueueView messageQueueView = getAllMessageQueueView(topicName);
+        MessageQueueView messageQueueView = getAllMessageQueueView(ctx, 
topicName);
         TopicRouteData topicRouteData = messageQueueView.getTopicRouteData();
 
         ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
@@ -83,13 +84,13 @@ public class LocalTopicRouteService extends 
TopicRouteService {
     }
 
     @Override
-    public String getBrokerAddr(String brokerName) throws Exception {
+    public String getBrokerAddr(ProxyContext ctx, String brokerName) throws 
Exception {
         return this.brokerController.getBrokerAddr();
     }
 
     @Override
-    public AddressableMessageQueue buildAddressableMessageQueue(MessageQueue 
messageQueue) throws Exception {
-        String brokerAddress = getBrokerAddr(messageQueue.getBrokerName());
+    public AddressableMessageQueue buildAddressableMessageQueue(ProxyContext 
ctx, MessageQueue messageQueue) throws Exception {
+        String brokerAddress = getBrokerAddr(ctx, 
messageQueue.getBrokerName());
         return new AddressableMessageQueue(messageQueue, brokerAddress);
     }
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index ba97e183b0..3fa6414c39 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
 import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -109,18 +110,18 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
         this.appendStartAndShutdown(this.mqClientAPIFactory);
     }
 
-    public MessageQueueView getAllMessageQueueView(String topicName) throws 
Exception {
+    public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String 
topicName) throws Exception {
         return getCacheMessageQueueWrapper(this.topicCache, topicName);
     }
 
-    public abstract MessageQueueView getCurrentMessageQueueView(String 
topicName) throws Exception;
+    public abstract MessageQueueView getCurrentMessageQueueView(ProxyContext 
ctx, String topicName) throws Exception;
 
-    public abstract ProxyTopicRouteData getTopicRouteForProxy(List<Address> 
requestHostAndPortList,
+    public abstract ProxyTopicRouteData getTopicRouteForProxy(ProxyContext 
ctx, List<Address> requestHostAndPortList,
         String topicName) throws Exception;
 
-    public abstract String getBrokerAddr(String brokerName) throws Exception;
+    public abstract String getBrokerAddr(ProxyContext ctx, String brokerName) 
throws Exception;
 
-    public abstract AddressableMessageQueue 
buildAddressableMessageQueue(MessageQueue messageQueue) throws Exception;
+    public abstract AddressableMessageQueue 
buildAddressableMessageQueue(ProxyContext ctx, MessageQueue messageQueue) 
throws Exception;
 
     protected static MessageQueueView 
getCacheMessageQueueWrapper(LoadingCache<String, MessageQueueView> topicCache,
         String key) throws Exception {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index cc988db782..2ef8497374 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
@@ -95,7 +96,7 @@ public abstract class AbstractSystemMessageSyncer implements 
StartAndShutdown, M
                 JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8)
             );
 
-            AddressableMessageQueue messageQueue = 
this.topicRouteService.getAllMessageQueueView(targetTopic)
+            AddressableMessageQueue messageQueue = 
this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
 targetTopic)
                 .getWriteSelector().selectOne(true);
             this.mqClientAPIFactory.getClient().sendMessageAsync(
                 messageQueue.getBrokerAddr(),
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
index 3254d711d7..f0e083adea 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
@@ -29,13 +29,13 @@ public abstract class AbstractTransactionService implements 
TransactionService,
     protected TransactionDataManager transactionDataManager = new 
TransactionDataManager();
 
     @Override
-    public TransactionData addTransactionDataByBrokerAddr(String brokerAddr, 
String producerGroup, long tranStateTableOffset, long commitLogOffset, String 
transactionId,
+    public TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx, 
String brokerAddr, String producerGroup, long tranStateTableOffset, long 
commitLogOffset, String transactionId,
         Message message) {
-        return 
this.addTransactionDataByBrokerName(this.getBrokerNameByAddr(brokerAddr), 
producerGroup, tranStateTableOffset, commitLogOffset, transactionId, message);
+        return this.addTransactionDataByBrokerName(ctx, 
this.getBrokerNameByAddr(brokerAddr), producerGroup, tranStateTableOffset, 
commitLogOffset, transactionId, message);
     }
 
     @Override
-    public TransactionData addTransactionDataByBrokerName(String brokerName, 
String producerGroup, long tranStateTableOffset, long commitLogOffset, String 
transactionId,
+    public TransactionData addTransactionDataByBrokerName(ProxyContext ctx, 
String brokerName, String producerGroup, long tranStateTableOffset, long 
commitLogOffset, String transactionId,
         Message message) {
         if (StringUtils.isBlank(brokerName)) {
             return null;
@@ -55,7 +55,7 @@ public abstract class AbstractTransactionService implements 
TransactionService,
     }
 
     @Override
-    public EndTransactionRequestData genEndTransactionRequestHeader(String 
producerGroup, Integer commitOrRollback,
+    public EndTransactionRequestData 
genEndTransactionRequestHeader(ProxyContext ctx, String producerGroup, Integer 
commitOrRollback,
         boolean fromTransactionCheck, String msgId, String transactionId) {
         TransactionData transactionData = 
this.transactionDataManager.pollNoExpireTransactionData(producerGroup, 
transactionId);
         if (transactionData == null) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
index eff3ac4922..1ec4286463 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -67,20 +68,20 @@ public class ClusterTransactionService extends 
AbstractTransactionService {
     }
 
     @Override
-    public void addTransactionSubscription(String group, List<String> 
topicList) {
+    public void addTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList) {
         for (String topic : topicList) {
-            addTransactionSubscription(group, topic);
+            addTransactionSubscription(ctx, group, topic);
         }
     }
 
     @Override
-    public void addTransactionSubscription(String group, String topic) {
+    public void addTransactionSubscription(ProxyContext ctx, String group, 
String topic) {
         try {
             groupClusterData.compute(group, (groupName, clusterDataSet) -> {
                 if (clusterDataSet == null) {
                     clusterDataSet = Sets.newHashSet();
                 }
-                clusterDataSet.addAll(getClusterDataFromTopic(topic));
+                clusterDataSet.addAll(getClusterDataFromTopic(ctx, topic));
                 return clusterDataSet;
             });
         } catch (Exception e) {
@@ -89,17 +90,17 @@ public class ClusterTransactionService extends 
AbstractTransactionService {
     }
 
     @Override
-    public void replaceTransactionSubscription(String group, List<String> 
topicList) {
+    public void replaceTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList) {
         Set<ClusterData> clusterDataSet = new HashSet<>();
         for (String topic : topicList) {
-            clusterDataSet.addAll(getClusterDataFromTopic(topic));
+            clusterDataSet.addAll(getClusterDataFromTopic(ctx, topic));
         }
         groupClusterData.put(group, clusterDataSet);
     }
 
-    private Set<ClusterData> getClusterDataFromTopic(String topic) {
+    private Set<ClusterData> getClusterDataFromTopic(ProxyContext ctx, String 
topic) {
         try {
-            MessageQueueView messageQueue = 
this.topicRouteService.getAllMessageQueueView(topic);
+            MessageQueueView messageQueue = 
this.topicRouteService.getAllMessageQueueView(ctx, topic);
             List<BrokerData> brokerDataList = 
messageQueue.getTopicRouteData().getBrokerDatas();
 
             if (brokerDataList == null) {
@@ -117,7 +118,7 @@ public class ClusterTransactionService extends 
AbstractTransactionService {
     }
 
     @Override
-    public void unSubscribeAllTransactionTopic(String group) {
+    public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group) 
{
         groupClusterData.remove(group);
     }
 
@@ -195,7 +196,7 @@ public class ClusterTransactionService extends 
AbstractTransactionService {
 
     protected void sendHeartBeatToCluster(String clusterName, HeartbeatData 
heartbeatData, Map<String, String> brokerAddrNameMap) {
         try {
-            MessageQueueView messageQueue = 
this.topicRouteService.getAllMessageQueueView(clusterName);
+            MessageQueueView messageQueue = 
this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
 clusterName);
             List<BrokerData> brokerDataList = 
messageQueue.getTopicRouteData().getBrokerDatas();
             if (brokerDataList == null) {
                 return;
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
index 2371b25a24..4a27e4ff24 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.proxy.service.transaction;
 
 import java.util.List;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 
 /**
  * no need to implements, because the channel of producer will put into the 
broker's producerManager
@@ -31,22 +32,22 @@ public class LocalTransactionService extends 
AbstractTransactionService {
     }
 
     @Override
-    public void addTransactionSubscription(String group, List<String> 
topicList) {
+    public void addTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList) {
 
     }
 
     @Override
-    public void addTransactionSubscription(String group, String topic) {
+    public void addTransactionSubscription(ProxyContext ctx, String group, 
String topic) {
 
     }
 
     @Override
-    public void replaceTransactionSubscription(String group, List<String> 
topicList) {
+    public void replaceTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList) {
 
     }
 
     @Override
-    public void unSubscribeAllTransactionTopic(String group) {
+    public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group) 
{
 
     }
 
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
index 2a851051eb..a7ab353242 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
@@ -22,21 +22,21 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
 
 public interface TransactionService {
 
-    void addTransactionSubscription(String group, List<String> topicList);
+    void addTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList);
 
-    void addTransactionSubscription(String group, String topic);
+    void addTransactionSubscription(ProxyContext ctx, String group, String 
topic);
 
-    void replaceTransactionSubscription(String group, List<String> topicList);
+    void replaceTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList);
 
-    void unSubscribeAllTransactionTopic(String group);
+    void unSubscribeAllTransactionTopic(ProxyContext ctx, String group);
 
-    TransactionData addTransactionDataByBrokerAddr(String brokerAddr, String 
producerGroup, long tranStateTableOffset, long commitLogOffset, String 
transactionId,
+    TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx, String 
brokerAddr, String producerGroup, long tranStateTableOffset, long 
commitLogOffset, String transactionId,
         Message message);
 
-    TransactionData addTransactionDataByBrokerName(String brokerName, String 
producerGroup, long tranStateTableOffset, long commitLogOffset, String 
transactionId,
+    TransactionData addTransactionDataByBrokerName(ProxyContext ctx, String 
brokerName, String producerGroup, long tranStateTableOffset, long 
commitLogOffset, String transactionId,
         Message message);
 
-    EndTransactionRequestData genEndTransactionRequestHeader(String 
producerGroup, Integer commitOrRollback,
+    EndTransactionRequestData genEndTransactionRequestHeader(ProxyContext ctx, 
String producerGroup, Integer commitOrRollback,
         boolean fromTransactionCheck, String msgId, String transactionId);
 
     void onSendCheckTransactionStateFailed(ProxyContext context, String 
producerGroup, TransactionData transactionData);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index 8431077020..a5d4e3c919 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -134,7 +134,7 @@ public class ClientActivityTest extends BaseActivityTest {
             txProducerTopicArgumentCaptor.capture()
         );
 
-        
when(this.metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.TRANSACTION);
+        when(this.metadataService.getTopicMessageType(any(), 
anyString())).thenReturn(TopicMessageType.TRANSACTION);
 
         HeartbeatResponse response = this.sendProducerHeartbeat(context);
 
@@ -222,7 +222,7 @@ public class ClientActivityTest extends BaseActivityTest {
             .build());
         ArgumentCaptor<ClientChannelInfo> channelInfoArgumentCaptor = 
ArgumentCaptor.forClass(ClientChannelInfo.class);
         doNothing().when(this.messagingProcessor).unRegisterProducer(any(), 
anyString(), channelInfoArgumentCaptor.capture());
-        
when(this.metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL);
+        when(this.metadataService.getTopicMessageType(any(), 
anyString())).thenReturn(TopicMessageType.NORMAL);
 
         this.sendProducerTelemetry(context);
         this.sendProducerHeartbeat(context);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index a861e8c13f..fdd052da76 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends 
BaseActivityTest {
         when(this.messagingProcessor.changeInvisibleTime(
             any(), receiptHandleCaptor.capture(), anyString(), anyString(), 
anyString(), invisibleTimeArgumentCaptor.capture()
         )).thenReturn(CompletableFuture.completedFuture(ackResult));
-        when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(), 
anyString(), anyString()))
+        when(receiptHandleProcessor.removeReceiptHandle(any(), any(), 
anyString(), anyString(), anyString()))
             .thenReturn(new MessageReceiptHandle("group", "topic", 0, 
savedHandleStr, "msgId", 0, 0));
 
         ChangeInvisibleDurationResponse response = 
this.changeInvisibleDurationActivity.changeInvisibleDuration(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index 68db3020e3..ec620340c5 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends 
BaseActivityTest {
             
.thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
 "")));
 
         String savedHandleStr = buildReceiptHandle("topic", 
System.currentTimeMillis(),3000);
-        when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(), 
anyString(), anyString()))
+        when(receiptHandleProcessor.removeReceiptHandle(any(), any(), 
anyString(), anyString(), anyString()))
             .thenReturn(new MessageReceiptHandle("group", "topic", 0, 
savedHandleStr, "msgId", 0, 0));
 
         ForwardMessageToDeadLetterQueueResponse response = 
this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index ce98b7494d..a7ba69098b 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -101,7 +101,7 @@ public class RouteActivityTest extends BaseActivityTest {
             .thenReturn(createProxyTopicRouteData(2, 2, 6));
         MetadataService metadataService = 
Mockito.mock(LocalMetadataService.class);
         
when(this.messagingProcessor.getMetadataService()).thenReturn(metadataService);
-        
when(metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL);
+        when(metadataService.getTopicMessageType(any(), 
anyString())).thenReturn(TopicMessageType.NORMAL);
 
         QueryRouteResponse response = this.routeActivity.queryRoute(
             createContext(),
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index c695eb0944..876b25b30b 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -91,7 +92,7 @@ public class ConsumerProcessorTest extends BaseProcessorTest {
         when(this.messageService.popMessage(any(), 
messageQueueArgumentCaptor.capture(), requestHeaderArgumentCaptor.capture(), 
anyLong()))
             .thenReturn(CompletableFuture.completedFuture(innerPopResult));
 
-        when(this.topicRouteService.getCurrentMessageQueueView(anyString()))
+        when(this.topicRouteService.getCurrentMessageQueueView(any(), 
anyString()))
             .thenReturn(mock(MessageQueueView.class));
 
         ArgumentCaptor<String> ackMessageIdArgumentCaptor = 
ArgumentCaptor.forClass(String.class);
@@ -191,12 +192,12 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
         AddressableMessageQueue addressableMessageQueue2 = new 
AddressableMessageQueue(mq2, "127.0.0.1");
         mqSet.add(mq1);
         mqSet.add(mq2);
-        
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i 
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], 
"127.0.0.1"));
+        when(this.topicRouteService.buildAddressableMessageQueue(any(), 
any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) 
i.getArguments()[1], "127.0.0.1"));
         when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue1), any(), anyLong()))
             
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
         when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue2), any(), anyLong()))
             
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq2)));
-        Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null, 
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+        Set<MessageQueue> result = 
this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet, 
CONSUMER_GROUP, CLIENT_ID, 1000)
             .get();
         assertThat(result).isEqualTo(mqSet);
     }
@@ -210,12 +211,12 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
         AddressableMessageQueue addressableMessageQueue2 = new 
AddressableMessageQueue(mq2, "127.0.0.1");
         mqSet.add(mq1);
         mqSet.add(mq2);
-        
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i 
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], 
"127.0.0.1"));
+        when(this.topicRouteService.buildAddressableMessageQueue(any(), 
any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) 
i.getArguments()[1], "127.0.0.1"));
         when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue1), any(), anyLong()))
             
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
         when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue2), any(), anyLong()))
             .thenReturn(CompletableFuture.completedFuture(Sets.newHashSet()));
-        Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null, 
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+        Set<MessageQueue> result = 
this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet, 
CONSUMER_GROUP, CLIENT_ID, 1000)
             .get();
         assertThat(result).isEqualTo(Sets.newHashSet(mq1));
     }
@@ -229,14 +230,14 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
         AddressableMessageQueue addressableMessageQueue2 = new 
AddressableMessageQueue(mq2, "127.0.0.1");
         mqSet.add(mq1);
         mqSet.add(mq2);
-        
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i 
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0], 
"127.0.0.1"));
+        when(this.topicRouteService.buildAddressableMessageQueue(any(), 
any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue) 
i.getArguments()[1], "127.0.0.1"));
         when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue1), any(), anyLong()))
             
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
         CompletableFuture<Set<MessageQueue>> future = new 
CompletableFuture<>();
         future.completeExceptionally(new MQBrokerException(1, "err"));
         when(this.messageService.lockBatchMQ(any(), 
eq(addressableMessageQueue2), any(), anyLong()))
             .thenReturn(future);
-        Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null, 
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+        Set<MessageQueue> result = 
this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet, 
CONSUMER_GROUP, CLIENT_ID, 1000)
             .get();
         assertThat(result).isEqualTo(Sets.newHashSet(mq1));
     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
index 213e6a6beb..de63b7e75f 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
@@ -70,7 +70,7 @@ public class ProducerProcessorTest extends BaseProcessorTest {
 
     @Test
     public void testSendMessage() throws Throwable {
-        
when(metadataService.getTopicMessageType(eq(TOPIC))).thenReturn(TopicMessageType.NORMAL);
+        when(metadataService.getTopicMessageType(any(), 
eq(TOPIC))).thenReturn(TopicMessageType.NORMAL);
         String txId = MessageClientIDSetter.createUniqID();
         String msgId = MessageClientIDSetter.createUniqID();
         long commitLogOffset = 1000L;
@@ -96,6 +96,7 @@ public class ProducerProcessorTest extends BaseProcessorTest {
         ArgumentCaptor<Long> tranStateTableOffsetCaptor = 
ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<Long> commitLogOffsetCaptor = 
ArgumentCaptor.forClass(Long.class);
         when(transactionService.addTransactionDataByBrokerName(
+            any(),
             brokerNameCaptor.capture(),
             anyString(),
             tranStateTableOffsetCaptor.capture(),
@@ -150,6 +151,7 @@ public class ProducerProcessorTest extends 
BaseProcessorTest {
         ArgumentCaptor<Long> tranStateTableOffsetCaptor = 
ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<Long> commitLogOffsetCaptor = 
ArgumentCaptor.forClass(Long.class);
         when(transactionService.addTransactionDataByBrokerName(
+            any(),
             brokerNameCaptor.capture(),
             anyString(),
             tranStateTableOffsetCaptor.capture(),
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index c0bff981f0..7206e6b791 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -107,8 +107,8 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testAddReceiptHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(new
 SubscriptionGroupConfig());
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
@@ -120,9 +120,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     public void testRenewReceiptHandle() {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         long newInvisibleTime = 18000L;
 
@@ -167,7 +167,7 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new MQClientException(0, 
"error"));
@@ -197,7 +197,7 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     public void testRenewWithInvalidHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
 
         CompletableFuture<AckResult> ackResultFuture = new 
CompletableFuture<>();
         ackResultFuture.completeExceptionally(new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -221,7 +221,7 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
 
         AtomicInteger count = new AtomicInteger(0);
         List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -299,10 +299,10 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, newReceiptHandle, messageReceiptHandle);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
             .thenReturn(CompletableFuture.completedFuture(new AckResult()));
         receiptHandleProcessor.scheduleRenewTask();
@@ -333,9 +333,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, newReceiptHandle, messageReceiptHandle);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(null);
         Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(), 
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), 
Mockito.anyLong()))
             .thenReturn(CompletableFuture.completedFuture(new AckResult()));
         receiptHandleProcessor.scheduleRenewTask();
@@ -369,9 +369,9 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC, 
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
             RECONSUME_TIMES);
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
newReceiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, newReceiptHandle, messageReceiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(), 
Mockito.eq(GROUP), 
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
@@ -382,10 +382,10 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testRemoveReceiptHandle() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
-        receiptHandleProcessor.removeReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.removeReceiptHandle(PROXY_CONTEXT, channel, 
GROUP, MSG_ID, receiptHandle);
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.anyString(),
@@ -395,10 +395,10 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
     @Test
     public void testClearGroup() {
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
         receiptHandleProcessor.clearGroup(new 
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
         SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-        
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+        Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), 
Mockito.eq(GROUP))).thenReturn(groupConfig);
         receiptHandleProcessor.scheduleRenewTask();
         Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
             .changeInvisibleTime(Mockito.any(ProxyContext.class), 
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
@@ -410,7 +410,7 @@ public class ReceiptHandleProcessorTest extends 
BaseProcessorTest {
         ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor = 
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
         Mockito.verify(messagingProcessor, 
Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
         Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
-        receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID, 
receiptHandle, messageReceiptHandle);
+        receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, 
MSG_ID, receiptHandle, messageReceiptHandle);
         
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, 
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
         assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
     }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
index f9473b450e..6bffb15bd1 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
@@ -54,7 +54,7 @@ public class TransactionProcessorTest extends 
BaseProcessorTest {
     protected void testEndTransaction(int sysFlag, TransactionStatus 
transactionStatus) throws Throwable {
         when(this.messageService.endTransactionOneway(any(), any(), any(), 
anyLong())).thenReturn(CompletableFuture.completedFuture(null));
         ArgumentCaptor<Integer> commitOrRollbackCaptor = 
ArgumentCaptor.forClass(Integer.class);
-        when(transactionService.genEndTransactionRequestHeader(anyString(), 
commitOrRollbackCaptor.capture(), anyBoolean(), anyString(), anyString()))
+        when(transactionService.genEndTransactionRequestHeader(any(), 
anyString(), commitOrRollbackCaptor.capture(), anyBoolean(), anyString(), 
anyString()))
             .thenReturn(new EndTransactionRequestData("brokerName", new 
EndTransactionRequestHeader()));
 
         this.transactionProcessor.endTransaction(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
index b88f6677ef..9d897642fd 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
@@ -76,7 +76,7 @@ public class SendMessageActivityTest extends InitConfigTest {
 
     @Test
     public void testSendMessage() throws Exception {
-        
when(metadataServiceMock.getTopicMessageType(eq(topic))).thenReturn(TopicMessageType.NORMAL);
+        when(metadataServiceMock.getTopicMessageType(any(), 
eq(topic))).thenReturn(TopicMessageType.NORMAL);
         Message message = new Message(topic, "123".getBytes());
         message.putUserProperty("a", "b");
         SendMessageRequestHeader sendMessageRequestHeader = new 
SendMessageRequestHeader();
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
index baecd47428..c97bd5a72b 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
@@ -35,6 +35,7 @@ import org.junit.Ignore;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -76,8 +77,8 @@ public class BaseServiceTest extends InitConfigTest {
         brokerData.setBrokerAddrs(brokerAddrs);
         topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
 
-        
when(this.topicRouteService.getAllMessageQueueView(eq(ERR_TOPIC))).thenThrow(new
 MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
-        
when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new 
MessageQueueView(TOPIC, topicRouteData));
-        
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new
 MessageQueueView(CLUSTER_NAME, topicRouteData));
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, 
""));
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, 
topicRouteData));
     }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
index 734207f025..7e4d25f0c0 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -50,7 +51,7 @@ public class ClusterMessageServiceTest {
 
     @Test
     public void testAckMessageByInvalidBrokerNameHandle() throws Exception {
-        when(topicRouteService.getBrokerAddr(anyString())).thenThrow(new 
MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
+        when(topicRouteService.getBrokerAddr(any(), 
anyString())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST, 
""));
         try {
             this.clusterMessageService.ackMessage(
                 ProxyContext.create(),
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
index 50afbc48f8..98bf1104f8 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.metadata;
 
 import java.util.HashMap;
 import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
 import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
@@ -54,17 +55,19 @@ public class ClusterMetadataServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testGetTopicMessageType() {
-        assertEquals(TopicMessageType.UNSPECIFIED, 
this.clusterMetadataService.getTopicMessageType(ERR_TOPIC));
+        ProxyContext ctx = ProxyContext.create();
+        assertEquals(TopicMessageType.UNSPECIFIED, 
this.clusterMetadataService.getTopicMessageType(ctx, ERR_TOPIC));
         assertEquals(1, 
this.clusterMetadataService.topicConfigCache.asMap().size());
-        assertEquals(TopicMessageType.UNSPECIFIED, 
this.clusterMetadataService.getTopicMessageType(ERR_TOPIC));
+        assertEquals(TopicMessageType.UNSPECIFIED, 
this.clusterMetadataService.getTopicMessageType(ctx, ERR_TOPIC));
 
-        assertEquals(TopicMessageType.NORMAL, 
this.clusterMetadataService.getTopicMessageType(TOPIC));
+        assertEquals(TopicMessageType.NORMAL, 
this.clusterMetadataService.getTopicMessageType(ctx, TOPIC));
         assertEquals(2, 
this.clusterMetadataService.topicConfigCache.asMap().size());
     }
 
     @Test
     public void testGetSubscriptionGroupConfig() {
-        
assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(GROUP));
+        ProxyContext ctx = ProxyContext.create();
+        
assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(ctx, 
GROUP));
         assertEquals(1, 
this.clusterMetadataService.subscriptionGroupConfigCache.asMap().size());
     }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
index 9b27eb56c4..b5fc1b6713 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.assertj.core.util.Lists;
@@ -61,18 +62,20 @@ public class ClusterTopicRouteServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testGetCurrentMessageQueueView() throws Throwable {
-        MQClientException exception = catchThrowableOfType(() -> 
this.topicRouteService.getCurrentMessageQueueView(ERR_TOPIC), 
MQClientException.class);
+        ProxyContext ctx = ProxyContext.create();
+        MQClientException exception = catchThrowableOfType(() -> 
this.topicRouteService.getCurrentMessageQueueView(ctx, ERR_TOPIC), 
MQClientException.class);
         assertTrue(TopicRouteHelper.isTopicNotExistError(exception));
         assertEquals(1, this.topicRouteService.topicCache.asMap().size());
 
-        
assertNotNull(this.topicRouteService.getCurrentMessageQueueView(TOPIC));
+        assertNotNull(this.topicRouteService.getCurrentMessageQueueView(ctx, 
TOPIC));
         assertEquals(2, this.topicRouteService.topicCache.asMap().size());
     }
 
     @Test
     public void testGetTopicRouteForProxy() throws Throwable {
+        ProxyContext ctx = ProxyContext.create();
         List<Address> addressList = Lists.newArrayList(new 
Address(Address.AddressScheme.IPv4, HostAndPort.fromParts("127.0.0.1", 8888)));
-        ProxyTopicRouteData proxyTopicRouteData = 
this.topicRouteService.getTopicRouteForProxy(addressList, TOPIC);
+        ProxyTopicRouteData proxyTopicRouteData = 
this.topicRouteService.getTopicRouteForProxy(ctx, addressList, TOPIC);
 
         assertEquals(1, proxyTopicRouteData.getBrokerDatas().size());
         assertEquals(addressList, 
proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID));
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
index 4948cddc2e..1ad39a1db6 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -76,8 +77,9 @@ public class LocalTopicRouteServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testGetCurrentMessageQueueView() throws Throwable {
+        ProxyContext ctx = ProxyContext.create();
         this.topicConfigTable.put(TOPIC, new TopicConfig(TOPIC, 3, 2, 
PermName.PERM_WRITE | PermName.PERM_READ));
-        MessageQueueView messageQueueView = 
this.topicRouteService.getCurrentMessageQueueView(TOPIC);
+        MessageQueueView messageQueueView = 
this.topicRouteService.getCurrentMessageQueueView(ctx, TOPIC);
         assertEquals(3, messageQueueView.getReadSelector().getQueues().size());
         assertEquals(2, 
messageQueueView.getWriteSelector().getQueues().size());
         assertEquals(1, 
messageQueueView.getReadSelector().getBrokerActingQueues().size());
@@ -90,7 +92,8 @@ public class LocalTopicRouteServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testGetTopicRouteForProxy() throws Throwable {
-        ProxyTopicRouteData proxyTopicRouteData = 
this.topicRouteService.getTopicRouteForProxy(new ArrayList<>(), TOPIC);
+        ProxyContext ctx = ProxyContext.create();
+        ProxyTopicRouteData proxyTopicRouteData = 
this.topicRouteService.getTopicRouteForProxy(ctx, new ArrayList<>(), TOPIC);
 
         assertEquals(1, proxyTopicRouteData.getBrokerDatas().size());
         assertEquals(
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 6540523598..6373aba308 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -133,7 +133,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
             brokerData.setBrokerAddrs(brokerAddr);
             topicRouteData.getBrokerDatas().add(brokerData);
             MessageQueueView messageQueueView = new MessageQueueView("foo", 
topicRouteData);
-            
when(this.topicRouteService.getAllMessageQueueView(anyString())).thenReturn(messageQueueView);
+            when(this.topicRouteService.getAllMessageQueueView(any(), 
anyString())).thenReturn(messageQueueView);
         }
     }
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
index 6e4af2e8f6..81de5ec843 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
@@ -37,6 +37,7 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
     private static final String BROKER_NAME = "mockBroker";
     private static final String PRODUCER_GROUP = "producerGroup";
     private static final Random RANDOM = new Random();
+    private final ProxyContext ctx = 
ProxyContext.createForInner(this.getClass());
 
     public static class MockAbstractTransactionServiceTest extends 
AbstractTransactionService {
 
@@ -46,22 +47,22 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
         }
 
         @Override
-        public void addTransactionSubscription(String group, List<String> 
topicList) {
+        public void addTransactionSubscription(ProxyContext ctx, String group, 
List<String> topicList) {
 
         }
 
         @Override
-        public void addTransactionSubscription(String group, String topic) {
+        public void addTransactionSubscription(ProxyContext ctx, String group, 
String topic) {
 
         }
 
         @Override
-        public void replaceTransactionSubscription(String group, List<String> 
topicList) {
+        public void replaceTransactionSubscription(ProxyContext ctx, String 
group, List<String> topicList) {
 
         }
 
         @Override
-        public void unSubscribeAllTransactionTopic(String group) {
+        public void unSubscribeAllTransactionTopic(ProxyContext ctx, String 
group) {
 
         }
     }
@@ -81,6 +82,7 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
         String txId = MessageClientIDSetter.createUniqID();
 
         TransactionData transactionData = 
transactionService.addTransactionDataByBrokerName(
+            ctx,
             BROKER_NAME,
             PRODUCER_GROUP,
             RANDOM.nextLong(),
@@ -91,6 +93,7 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
         assertNotNull(transactionData);
 
         EndTransactionRequestData requestData = 
transactionService.genEndTransactionRequestHeader(
+            ctx,
             PRODUCER_GROUP,
             MessageSysFlag.TRANSACTION_COMMIT_TYPE,
             true,
@@ -104,6 +107,7 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
         assertEquals(transactionData.getTranStateTableOffset(), 
requestData.getRequestHeader().getTranStateTableOffset().longValue());
 
         assertNull(transactionService.genEndTransactionRequestHeader(
+            ctx,
             "group",
             MessageSysFlag.TRANSACTION_COMMIT_TYPE,
             true,
@@ -119,6 +123,7 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
         String txId = MessageClientIDSetter.createUniqID();
 
         TransactionData transactionData = 
transactionService.addTransactionDataByBrokerName(
+            ctx,
             BROKER_NAME,
             PRODUCER_GROUP,
             RANDOM.nextLong(),
@@ -128,6 +133,7 @@ public class AbstractTransactionServiceTest extends 
InitConfigTest {
         );
         
transactionService.onSendCheckTransactionStateFailed(ProxyContext.createForInner(this.getClass()),
 PRODUCER_GROUP, transactionData);
         assertNull(transactionService.genEndTransactionRequestHeader(
+            ctx,
             PRODUCER_GROUP,
             MessageSysFlag.TRANSACTION_COMMIT_TYPE,
             true,
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
index 2b56839301..a0063544ec 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.service.BaseServiceTest;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -44,6 +45,7 @@ import org.mockito.Mockito;
 import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -53,7 +55,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
     @Mock
     private ProducerManager producerManager;
-
+    private ProxyContext ctx = ProxyContext.create();
     private ClusterTransactionService clusterTransactionService;
 
     @Before
@@ -63,7 +65,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
             this.mqClientAPIFactory);
 
         MessageQueueView messageQueueView = new MessageQueueView(TOPIC, 
topicRouteData);
-        when(this.topicRouteService.getAllMessageQueueView(anyString()))
+        when(this.topicRouteService.getAllMessageQueueView(any(), anyString()))
             .thenReturn(messageQueueView);
 
         when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt);
@@ -71,7 +73,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testAddTransactionSubscription() {
-        this.clusterTransactionService.addTransactionSubscription(GROUP, 
TOPIC);
+        this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, 
TOPIC);
 
         assertEquals(1, 
this.clusterTransactionService.getGroupClusterData().size());
         assertEquals(CLUSTER_NAME, 
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
@@ -79,7 +81,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testAddTransactionSubscriptionTopicList() {
-        this.clusterTransactionService.addTransactionSubscription(GROUP, 
Lists.newArrayList(TOPIC + 1, TOPIC + 2));
+        this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, 
Lists.newArrayList(TOPIC + 1, TOPIC + 2));
 
         assertEquals(1, 
this.clusterTransactionService.getGroupClusterData().size());
         assertEquals(CLUSTER_NAME, 
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
@@ -87,21 +89,21 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
     @Test
     public void testReplaceTransactionSubscription() {
-        this.clusterTransactionService.addTransactionSubscription(GROUP, 
TOPIC);
+        this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, 
TOPIC);
 
         assertEquals(1, 
this.clusterTransactionService.getGroupClusterData().size());
         assertEquals(CLUSTER_NAME, 
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
 
         this.brokerData.setCluster(CLUSTER_NAME + 1);
-        this.clusterTransactionService.replaceTransactionSubscription(GROUP, 
Lists.newArrayList(TOPIC + 1));
+        this.clusterTransactionService.replaceTransactionSubscription(ctx, 
GROUP, Lists.newArrayList(TOPIC + 1));
         assertEquals(1, 
this.clusterTransactionService.getGroupClusterData().size());
         assertEquals(CLUSTER_NAME + 1, 
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
     }
 
     @Test
     public void testUnSubscribeAllTransactionTopic() {
-        this.clusterTransactionService.addTransactionSubscription(GROUP, 
TOPIC);
-        this.clusterTransactionService.unSubscribeAllTransactionTopic(GROUP);
+        this.clusterTransactionService.addTransactionSubscription(ctx, GROUP, 
TOPIC);
+        this.clusterTransactionService.unSubscribeAllTransactionTopic(ctx, 
GROUP);
 
         assertEquals(0, 
this.clusterTransactionService.getGroupClusterData().size());
     }
@@ -125,7 +127,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
         brokerData.setBrokerAddrs(brokerAddrs);
         topicRouteData.getQueueDatas().add(queueData);
         topicRouteData.getBrokerDatas().add(brokerData);
-        
when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new 
MessageQueueView(TOPIC, topicRouteData));
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
 
         TopicRouteData clusterTopicRouteData = new TopicRouteData();
         QueueData clusterQueueData = new QueueData();
@@ -139,7 +141,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
         brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
         clusterBrokerData.setBrokerAddrs(brokerAddrs);
         
clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData));
-        
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new
 MessageQueueView(CLUSTER_NAME, clusterTopicRouteData));
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME, 
clusterTopicRouteData));
 
         TopicRouteData clusterTopicRouteData2 = new TopicRouteData();
         QueueData clusterQueueData2 = new QueueData();
@@ -153,7 +155,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
         brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2);
         clusterBrokerData2.setBrokerAddrs(brokerAddrs);
         
clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2));
-        
when(this.topicRouteService.getAllMessageQueueView(eq(clusterName2))).thenReturn(new
 MessageQueueView(clusterName2, clusterTopicRouteData2));
+        when(this.topicRouteService.getAllMessageQueueView(any(), 
eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2, 
clusterTopicRouteData2));
 
         
ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2);
         this.clusterTransactionService.start();
@@ -161,7 +163,7 @@ public class ClusterTransactionServiceTest extends 
BaseServiceTest {
 
         for (int i = 0; i < 3; i++) {
             groupSet.add(GROUP + i);
-            this.clusterTransactionService.addTransactionSubscription(GROUP + 
i, TOPIC);
+            this.clusterTransactionService.addTransactionSubscription(ctx, 
GROUP + i, TOPIC);
         }
 
         ArgumentCaptor<String> brokerAddrArgumentCaptor = 
ArgumentCaptor.forClass(String.class);

Reply via email to