This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a325d144b2 [ISSUE #6858] passing through ProxyContext for future
expansion (#6859)
a325d144b2 is described below
commit a325d144b24a1acdc92a5ac308865080532325d9
Author: lk <[email protected]>
AuthorDate: Tue Jun 6 17:07:06 2023 +0800
[ISSUE #6858] passing through ProxyContext for future expansion (#6859)
---
.../proxy/grpc/v2/client/ClientActivity.java | 2 +-
.../proxy/grpc/v2/consumer/AckMessageActivity.java | 2 +-
.../consumer/ChangeInvisibleDurationActivity.java | 2 +-
.../grpc/v2/consumer/ReceiveMessageActivity.java | 2 +-
.../v2/producer/ForwardMessageToDLQActivity.java | 2 +-
.../proxy/grpc/v2/route/RouteActivity.java | 2 +-
.../proxy/processor/ConsumerProcessor.java | 20 ++++++------
.../proxy/processor/DefaultMessagingProcessor.java | 4 +--
.../proxy/processor/ProducerProcessor.java | 9 ++---
.../proxy/processor/ReceiptHandleProcessor.java | 14 ++++----
.../proxy/processor/TransactionProcessor.java | 3 +-
.../remoting/activity/SendMessageActivity.java | 2 +-
.../proxy/service/ClusterServiceManager.java | 3 +-
.../service/message/ClusterMessageService.java | 20 ++++++------
.../service/metadata/ClusterMetadataService.java | 7 ++--
.../service/metadata/LocalMetadataService.java | 5 +--
.../proxy/service/metadata/MetadataService.java | 5 +--
.../service/relay/AbstractProxyRelayService.java | 1 +
.../service/route/ClusterTopicRouteService.java | 17 +++++-----
.../service/route/LocalTopicRouteService.java | 13 ++++----
.../proxy/service/route/TopicRouteService.java | 11 ++++---
.../sysmessage/AbstractSystemMessageSyncer.java | 3 +-
.../transaction/AbstractTransactionService.java | 8 ++---
.../transaction/ClusterTransactionService.java | 21 ++++++------
.../transaction/LocalTransactionService.java | 9 ++---
.../service/transaction/TransactionService.java | 14 ++++----
.../proxy/grpc/v2/client/ClientActivityTest.java | 4 +--
.../ChangeInvisibleDurationActivityTest.java | 2 +-
.../producer/ForwardMessageToDLQActivityTest.java | 2 +-
.../proxy/grpc/v2/route/RouteActivityTest.java | 2 +-
.../proxy/processor/ConsumerProcessorTest.java | 15 +++++----
.../proxy/processor/ProducerProcessorTest.java | 4 ++-
.../processor/ReceiptHandleProcessorTest.java | 38 +++++++++++-----------
.../proxy/processor/TransactionProcessorTest.java | 2 +-
.../remoting/activity/SendMessageActivityTest.java | 2 +-
.../rocketmq/proxy/service/BaseServiceTest.java | 7 ++--
.../service/message/ClusterMessageServiceTest.java | 3 +-
.../metadata/ClusterMetadataServiceTest.java | 11 ++++---
.../route/ClusterTopicRouteServiceTest.java | 9 +++--
.../service/route/LocalTopicRouteServiceTest.java | 7 ++--
.../service/sysmessage/HeartbeatSyncerTest.java | 2 +-
.../AbstractTransactionServiceTest.java | 14 +++++---
.../transaction/ClusterTransactionServiceTest.java | 26 ++++++++-------
43 files changed, 193 insertions(+), 158 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
index de8fba4a63..a60228eb9f 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivity.java
@@ -287,7 +287,7 @@ public class ClientActivity extends AbstractMessingActivity
{
// use topic name as producer group
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel,
clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
this.messagingProcessor.registerProducer(ctx, topicName,
clientChannelInfo);
- TopicMessageType topicMessageType =
this.messagingProcessor.getMetadataService().getTopicMessageType(topicName);
+ TopicMessageType topicMessageType =
this.messagingProcessor.getMetadataService().getTopicMessageType(ctx,
topicName);
if (TopicMessageType.TRANSACTION.equals(topicMessageType)) {
this.messagingProcessor.addTransactionSubscription(ctx, topicName,
topicName);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index fb31a60624..993f069b94 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -98,7 +98,7 @@ public class AckMessageActivity extends
AbstractMessingActivity {
String handleString = ackMessageEntry.getReceiptHandle();
String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+ MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group,
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
if (messageReceiptHandle != null) {
handleString = messageReceiptHandle.getReceiptHandleStr();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
index 0f33cc7aa7..9b7e947e0b 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivity.java
@@ -55,7 +55,7 @@ public class ChangeInvisibleDurationActivity extends
AbstractMessingActivity {
ReceiptHandle receiptHandle =
ReceiptHandle.decode(request.getReceiptHandle());
String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, request.getMessageId(), receiptHandle.getReceiptHandle());
+ MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group,
request.getMessageId(), receiptHandle.getReceiptHandle());
if (messageReceiptHandle != null) {
receiptHandle =
ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index 9df4101f73..22a149004c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -144,7 +144,7 @@ public class ReceiveMessageActivity extends
AbstractMessingActivity {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic,
messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
messageExt.getQueueOffset(),
messageExt.getReconsumeTimes());
-
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
+
receiptHandleProcessor.addReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(),
receiptHandle, messageReceiptHandle);
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
index 789927d693..6b5c5c7e07 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivity.java
@@ -48,7 +48,7 @@ public class ForwardMessageToDLQActivity extends
AbstractMessingActivity {
String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
String handleString = request.getReceiptHandle();
- MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()),
group, request.getMessageId(), request.getReceiptHandle());
+ MessageReceiptHandle messageReceiptHandle =
receiptHandleProcessor.removeReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group,
request.getMessageId(), request.getReceiptHandle());
if (messageReceiptHandle != null) {
handleString = messageReceiptHandle.getReceiptHandleStr();
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
index eb7385f874..c5d485691b 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivity.java
@@ -71,7 +71,7 @@ public class RouteActivity extends AbstractMessingActivity {
List<MessageQueue> messageQueueList = new ArrayList<>();
Map<String, Map<Long, Broker>> brokerMap =
buildBrokerMap(proxyTopicRouteData.getBrokerDatas());
- TopicMessageType topicMessageType =
messagingProcessor.getMetadataService().getTopicMessageType(topicName);
+ TopicMessageType topicMessageType =
messagingProcessor.getMetadataService().getTopicMessageType(ctx, topicName);
for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
String brokerName = queueData.getBrokerName();
Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index d67f4b855d..c860ee8a1a 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -87,7 +87,7 @@ public class ConsumerProcessor extends AbstractProcessor {
) {
CompletableFuture<PopResult> future = new CompletableFuture<>();
try {
- AddressableMessageQueue messageQueue = queueSelector.select(ctx,
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
+ AddressableMessageQueue messageQueue = queueSelector.select(ctx,
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx,
topic));
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no
readable queue");
}
@@ -287,7 +287,7 @@ public class ConsumerProcessor extends AbstractProcessor {
CompletableFuture<PullResult> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue =
serviceManager.getTopicRouteService()
- .buildAddressableMessageQueue(messageQueue);
+ .buildAddressableMessageQueue(ctx, messageQueue);
PullMessageRequestHeader requestHeader = new
PullMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
@@ -311,7 +311,7 @@ public class ConsumerProcessor extends AbstractProcessor {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue =
serviceManager.getTopicRouteService()
- .buildAddressableMessageQueue(messageQueue);
+ .buildAddressableMessageQueue(ctx, messageQueue);
UpdateConsumerOffsetRequestHeader requestHeader = new
UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
@@ -329,7 +329,7 @@ public class ConsumerProcessor extends AbstractProcessor {
CompletableFuture<Long> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue =
serviceManager.getTopicRouteService()
- .buildAddressableMessageQueue(messageQueue);
+ .buildAddressableMessageQueue(ctx, messageQueue);
QueryConsumerOffsetRequestHeader requestHeader = new
QueryConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
@@ -345,7 +345,7 @@ public class ConsumerProcessor extends AbstractProcessor {
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Set<MessageQueue>> future = new
CompletableFuture<>();
Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
- Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(mqSet);
+ Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap =
buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
@@ -370,7 +370,7 @@ public class ConsumerProcessor extends AbstractProcessor {
public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx,
Set<MessageQueue> mqSet,
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
- Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(mqSet);
+ Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap =
buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
@@ -394,7 +394,7 @@ public class ConsumerProcessor extends AbstractProcessor {
CompletableFuture<Long> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue =
serviceManager.getTopicRouteService()
- .buildAddressableMessageQueue(messageQueue);
+ .buildAddressableMessageQueue(ctx, messageQueue);
GetMaxOffsetRequestHeader requestHeader = new
GetMaxOffsetRequestHeader();
requestHeader.setTopic(addressableMessageQueue.getTopic());
requestHeader.setQueueId(addressableMessageQueue.getQueueId());
@@ -409,7 +409,7 @@ public class ConsumerProcessor extends AbstractProcessor {
CompletableFuture<Long> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue =
serviceManager.getTopicRouteService()
- .buildAddressableMessageQueue(messageQueue);
+ .buildAddressableMessageQueue(ctx, messageQueue);
GetMinOffsetRequestHeader requestHeader = new
GetMinOffsetRequestHeader();
requestHeader.setTopic(addressableMessageQueue.getTopic());
requestHeader.setQueueId(addressableMessageQueue.getQueueId());
@@ -420,10 +420,10 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
- protected Set<AddressableMessageQueue>
buildAddressableSet(Set<MessageQueue> mqSet) {
+ protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext
ctx, Set<MessageQueue> mqSet) {
return mqSet.stream().map(mq -> {
try {
- return
serviceManager.getTopicRouteService().buildAddressableMessageQueue(mq);
+ return
serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq);
} catch (Exception e) {
return null;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index bfadc0d3e0..81d2b9df35 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -127,13 +127,13 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
@Override
public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext
ctx, String consumerGroupName) {
- return
this.serviceManager.getMetadataService().getSubscriptionGroupConfig(consumerGroupName);
+ return
this.serviceManager.getMetadataService().getSubscriptionGroupConfig(ctx,
consumerGroupName);
}
@Override
public ProxyTopicRouteData getTopicRouteDataForProxy(ProxyContext ctx,
List<Address> requestHostAndPortList,
String topicName) throws Exception {
- return
this.serviceManager.getTopicRouteService().getTopicRouteForProxy(requestHostAndPortList,
topicName);
+ return
this.serviceManager.getTopicRouteService().getTopicRouteForProxy(ctx,
requestHostAndPortList, topicName);
}
@Override
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
index 749f9da2be..0d0c621686 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java
@@ -73,14 +73,14 @@ public class ProducerProcessor extends AbstractProcessor {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) &&
!NamespaceUtil.isDLQTopic(topic)) {
- TopicMessageType topicMessageType =
serviceManager.getMetadataService().getTopicMessageType(topic);
+ TopicMessageType topicMessageType =
serviceManager.getMetadataService().getTopicMessageType(ctx, topic);
TopicMessageType messageType =
TopicMessageType.parseFromMessageProperty(message.getProperties());
topicMessageTypeValidator.validate(topicMessageType,
messageType);
}
}
}
AddressableMessageQueue messageQueue = queueSelector.select(ctx,
-
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
+
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx,
topic));
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no
writable queue");
}
@@ -102,7 +102,7 @@ public class ProducerProcessor extends AbstractProcessor {
if
(SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
tranType ==
MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
StringUtils.isNotBlank(sendResult.getTransactionId())) {
- fillTransactionData(producerGroup, messageQueue,
sendResult, messageList);
+ fillTransactionData(ctx, producerGroup,
messageQueue, sendResult, messageList);
}
}
return sendResultList;
@@ -113,7 +113,7 @@ public class ProducerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
- protected void fillTransactionData(String producerGroup,
AddressableMessageQueue messageQueue, SendResult sendResult, List<Message>
messageList) {
+ protected void fillTransactionData(ProxyContext ctx, String producerGroup,
AddressableMessageQueue messageQueue, SendResult sendResult, List<Message>
messageList) {
try {
MessageId id;
if (sendResult.getOffsetMsgId() != null) {
@@ -122,6 +122,7 @@ public class ProducerProcessor extends AbstractProcessor {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
this.serviceManager.getTransactionService().addTransactionDataByBrokerName(
+ ctx,
messageQueue.getBrokerName(),
producerGroup,
sendResult.getQueueOffset(),
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index c903220bbe..7fe97db798 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -201,7 +201,7 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
});
} else {
SubscriptionGroupConfig subscriptionGroupConfig =
-
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(messageReceiptHandle.getGroup());
+
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context,
messageReceiptHandle.getGroup());
if (subscriptionGroupConfig == null) {
log.error("group's subscriptionGroupConfig is null when
renew. handle: {}", messageReceiptHandle);
return CompletableFuture.completedFuture(null);
@@ -240,12 +240,12 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
return
this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"),
groupKey.group, groupKey.channel) == null;
}
- public void addReceiptHandle(Channel channel, String group, String msgID,
String receiptHandle,
+ public void addReceiptHandle(ProxyContext ctx, Channel channel, String
group, String msgID, String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
- this.addReceiptHandle(new ReceiptHandleGroupKey(channel, group),
msgID, receiptHandle, messageReceiptHandle);
+ this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group),
msgID, receiptHandle, messageReceiptHandle);
}
- protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID,
String receiptHandle,
+ protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey
key, String msgID, String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
if (key == null) {
return;
@@ -254,11 +254,11 @@ public class ReceiptHandleProcessor extends
AbstractStartAndShutdown {
k -> new ReceiptHandleGroup()).put(msgID, receiptHandle,
messageReceiptHandle);
}
- public MessageReceiptHandle removeReceiptHandle(Channel channel, String
group, String msgID, String receiptHandle) {
- return this.removeReceiptHandle(new ReceiptHandleGroupKey(channel,
group), msgID, receiptHandle);
+ public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel
channel, String group, String msgID, String receiptHandle) {
+ return this.removeReceiptHandle(ctx, new
ReceiptHandleGroupKey(channel, group), msgID, receiptHandle);
}
- protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey
key, String msgID, String receiptHandle) {
+ protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx,
ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
if (key == null) {
return null;
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
index 3b284cd056..c0ba255f54 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/TransactionProcessor.java
@@ -36,6 +36,7 @@ public class TransactionProcessor extends AbstractProcessor {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
EndTransactionRequestData headerData =
serviceManager.getTransactionService().genEndTransactionRequestHeader(
+ ctx,
producerGroup,
buildCommitOrRollback(transactionStatus),
fromTransactionCheck,
@@ -70,6 +71,6 @@ public class TransactionProcessor extends AbstractProcessor {
}
public void addTransactionSubscription(ProxyContext ctx, String
producerGroup, String topic) {
-
this.serviceManager.getTransactionService().addTransactionSubscription(producerGroup,
topic);
+
this.serviceManager.getTransactionService().addTransactionSubscription(ctx,
producerGroup, topic);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
index 618d458743..17af0fdcb3 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivity.java
@@ -70,7 +70,7 @@ public class SendMessageActivity extends
AbstractRemotingActivity {
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) &&
!NamespaceUtil.isDLQTopic(topic)) {
- TopicMessageType topicMessageType =
messagingProcessor.getMetadataService().getTopicMessageType(topic);
+ TopicMessageType topicMessageType =
messagingProcessor.getMetadataService().getTopicMessageType(context, topic);
topicMessageTypeValidator.validate(topicMessageType,
messageType);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index 20beeb5666..95cc4d1497 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.admin.AdminService;
@@ -191,7 +192,7 @@ public class ClusterServiceManager extends
AbstractStartAndShutdown implements S
@Override
public void handle(ProducerGroupEvent event, String group,
ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.GROUP_UNREGISTER) {
- getTransactionService().unSubscribeAllTransactionTopic(group);
+
getTransactionService().unSubscribeAllTransactionTopic(ProxyContext.createForInner(this.getClass()),
group);
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index 7150967d45..9f163f1b98 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -80,7 +80,7 @@ public class ClusterMessageService implements MessageService {
public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext
ctx, ReceiptHandle handle, String messageId,
ConsumerSendMsgBackRequestHeader requestHeader, long timeoutMillis) {
return this.mqClientAPIFactory.getClient().sendMessageBackAsync(
- this.resolveBrokerAddrInReceiptHandle(handle),
+ this.resolveBrokerAddrInReceiptHandle(ctx, handle),
requestHeader,
timeoutMillis
);
@@ -93,7 +93,7 @@ public class ClusterMessageService implements MessageService {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
this.mqClientAPIFactory.getClient().endTransactionOneway(
- this.resolveBrokerAddr(brokerName),
+ this.resolveBrokerAddr(ctx, brokerName),
requestHeader,
"end transaction from proxy",
timeoutMillis
@@ -120,7 +120,7 @@ public class ClusterMessageService implements
MessageService {
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle, String messageId,
ChangeInvisibleTimeRequestHeader requestHeader, long timeoutMillis) {
return this.mqClientAPIFactory.getClient().changeInvisibleTimeAsync(
- this.resolveBrokerAddrInReceiptHandle(handle),
+ this.resolveBrokerAddrInReceiptHandle(ctx, handle),
handle.getBrokerName(),
requestHeader,
timeoutMillis
@@ -131,7 +131,7 @@ public class ClusterMessageService implements
MessageService {
public CompletableFuture<AckResult> ackMessage(ProxyContext ctx,
ReceiptHandle handle, String messageId,
AckMessageRequestHeader requestHeader, long timeoutMillis) {
return this.mqClientAPIFactory.getClient().ackMessageAsync(
- this.resolveBrokerAddrInReceiptHandle(handle),
+ this.resolveBrokerAddrInReceiptHandle(ctx, handle),
requestHeader,
timeoutMillis
);
@@ -211,7 +211,7 @@ public class ClusterMessageService implements
MessageService {
public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String
brokerName, RemotingCommand request,
long timeoutMillis) {
try {
- String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+ String brokerAddress = topicRouteService.getBrokerAddr(ctx,
brokerName);
return mqClientAPIFactory.getClient().invoke(brokerAddress,
request, timeoutMillis);
} catch (Throwable t) {
return FutureUtils.completeExceptionally(t);
@@ -222,24 +222,24 @@ public class ClusterMessageService implements
MessageService {
public CompletableFuture<Void> requestOneway(ProxyContext ctx, String
brokerName, RemotingCommand request,
long timeoutMillis) {
try {
- String brokerAddress = topicRouteService.getBrokerAddr(brokerName);
+ String brokerAddress = topicRouteService.getBrokerAddr(ctx,
brokerName);
return mqClientAPIFactory.getClient().invokeOneway(brokerAddress,
request, timeoutMillis);
} catch (Throwable t) {
return FutureUtils.completeExceptionally(t);
}
}
- protected String resolveBrokerAddrInReceiptHandle(ReceiptHandle handle) {
+ protected String resolveBrokerAddrInReceiptHandle(ProxyContext ctx,
ReceiptHandle handle) {
try {
- return
this.topicRouteService.getBrokerAddr(handle.getBrokerName());
+ return this.topicRouteService.getBrokerAddr(ctx,
handle.getBrokerName());
} catch (Throwable t) {
throw new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "cannot find broker "
+ handle.getBrokerName(), t);
}
}
- protected String resolveBrokerAddr(String brokerName) {
+ protected String resolveBrokerAddr(ProxyContext ctx, String brokerName) {
try {
- return this.topicRouteService.getBrokerAddr(brokerName);
+ return this.topicRouteService.getBrokerAddr(ctx, brokerName);
} catch (Throwable t) {
throw new ProxyException(ProxyExceptionCode.INVALID_BROKER_NAME,
"cannot find broker " + brokerName, t);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index 7934d3c860..bc9582ad81 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -83,7 +84,7 @@ public class ClusterMetadataService extends
AbstractStartAndShutdown implements
}
@Override
- public TopicMessageType getTopicMessageType(String topic) {
+ public TopicMessageType getTopicMessageType(ProxyContext ctx, String
topic) {
TopicConfigAndQueueMapping topicConfigAndQueueMapping;
try {
topicConfigAndQueueMapping = topicConfigCache.get(topic);
@@ -97,7 +98,7 @@ public class ClusterMetadataService extends
AbstractStartAndShutdown implements
}
@Override
- public SubscriptionGroupConfig getSubscriptionGroupConfig(String group) {
+ public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext
ctx, String group) {
SubscriptionGroupConfig config;
try {
config = this.subscriptionGroupConfigCache.get(group);
@@ -158,7 +159,7 @@ public class ClusterMetadataService extends
AbstractStartAndShutdown implements
protected Optional<BrokerData> findOneBroker(String topic) throws
Exception {
try {
- return
topicRouteService.getAllMessageQueueView(topic).getTopicRouteData().getBrokerDatas().stream().findAny();
+ return
topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
topic).getTopicRouteData().getBrokerDatas().stream().findAny();
} catch (Exception e) {
if (TopicRouteHelper.isTopicNotExistError(e)) {
return Optional.empty();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
index bc1d03e74b..7f3c041f25 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/LocalMetadataService.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.proxy.service.metadata;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
public class LocalMetadataService implements MetadataService {
@@ -30,7 +31,7 @@ public class LocalMetadataService implements MetadataService {
}
@Override
- public TopicMessageType getTopicMessageType(String topic) {
+ public TopicMessageType getTopicMessageType(ProxyContext ctx, String
topic) {
TopicConfig topicConfig =
brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (topicConfig == null) {
return TopicMessageType.UNSPECIFIED;
@@ -39,7 +40,7 @@ public class LocalMetadataService implements MetadataService {
}
@Override
- public SubscriptionGroupConfig getSubscriptionGroupConfig(String group) {
+ public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext
ctx, String group) {
return
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
index d5e38f145b..3ee0f3eacd 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/MetadataService.java
@@ -18,11 +18,12 @@
package org.apache.rocketmq.proxy.service.metadata;
import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
public interface MetadataService {
- TopicMessageType getTopicMessageType(String topic);
+ TopicMessageType getTopicMessageType(ProxyContext ctx, String topic);
- SubscriptionGroupConfig getSubscriptionGroupConfig(String group);
+ SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx,
String group);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
index ed68d1d3ae..08f00bd83c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/relay/AbstractProxyRelayService.java
@@ -43,6 +43,7 @@ public abstract class AbstractProxyRelayService implements
ProxyRelayService {
CompletableFuture<ProxyRelayResult<Void>> future = new
CompletableFuture<>();
String group =
messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
TransactionData transactionData =
transactionService.addTransactionDataByBrokerAddr(
+ context,
command.getExtFields().get(ProxyUtils.BROKER_ADDR),
group,
header.getTranStateTableOffset(),
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
index 31e1f94c55..fb97002df7 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteService.java
@@ -21,6 +21,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
@@ -31,14 +32,14 @@ public class ClusterTopicRouteService extends
TopicRouteService {
}
@Override
- public MessageQueueView getCurrentMessageQueueView(String topicName)
throws Exception {
- return getAllMessageQueueView(topicName);
+ public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx,
String topicName) throws Exception {
+ return getAllMessageQueueView(ctx, topicName);
}
@Override
- public ProxyTopicRouteData getTopicRouteForProxy(List<Address>
requestHostAndPortList,
+ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx,
List<Address> requestHostAndPortList,
String topicName) throws Exception {
- TopicRouteData topicRouteData =
getAllMessageQueueView(topicName).getTopicRouteData();
+ TopicRouteData topicRouteData = getAllMessageQueueView(ctx,
topicName).getTopicRouteData();
ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
proxyTopicRouteData.setQueueDatas(topicRouteData.getQueueDatas());
@@ -57,8 +58,8 @@ public class ClusterTopicRouteService extends
TopicRouteService {
}
@Override
- public String getBrokerAddr(String brokerName) throws Exception {
- List<BrokerData> brokerDataList =
getAllMessageQueueView(brokerName).getTopicRouteData().getBrokerDatas();
+ public String getBrokerAddr(ProxyContext ctx, String brokerName) throws
Exception {
+ List<BrokerData> brokerDataList = getAllMessageQueueView(ctx,
brokerName).getTopicRouteData().getBrokerDatas();
if (brokerDataList.isEmpty()) {
return null;
}
@@ -66,8 +67,8 @@ public class ClusterTopicRouteService extends
TopicRouteService {
}
@Override
- public AddressableMessageQueue buildAddressableMessageQueue(MessageQueue
messageQueue) throws Exception {
- String brokerAddress = getBrokerAddr(messageQueue.getBrokerName());
+ public AddressableMessageQueue buildAddressableMessageQueue(ProxyContext
ctx, MessageQueue messageQueue) throws Exception {
+ String brokerAddress = getBrokerAddr(ctx,
messageQueue.getBrokerName());
return new AddressableMessageQueue(messageQueue, brokerAddress);
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
index 3ac7ae75b9..d67b68f38e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteService.java
@@ -26,6 +26,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
@@ -51,15 +52,15 @@ public class LocalTopicRouteService extends
TopicRouteService {
}
@Override
- public MessageQueueView getCurrentMessageQueueView(String topic) throws
Exception {
+ public MessageQueueView getCurrentMessageQueueView(ProxyContext ctx,
String topic) throws Exception {
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
return new MessageQueueView(topic, toTopicRouteData(topicConfig));
}
@Override
- public ProxyTopicRouteData getTopicRouteForProxy(List<Address>
requestHostAndPortList,
+ public ProxyTopicRouteData getTopicRouteForProxy(ProxyContext ctx,
List<Address> requestHostAndPortList,
String topicName) throws Exception {
- MessageQueueView messageQueueView = getAllMessageQueueView(topicName);
+ MessageQueueView messageQueueView = getAllMessageQueueView(ctx,
topicName);
TopicRouteData topicRouteData = messageQueueView.getTopicRouteData();
ProxyTopicRouteData proxyTopicRouteData = new ProxyTopicRouteData();
@@ -83,13 +84,13 @@ public class LocalTopicRouteService extends
TopicRouteService {
}
@Override
- public String getBrokerAddr(String brokerName) throws Exception {
+ public String getBrokerAddr(ProxyContext ctx, String brokerName) throws
Exception {
return this.brokerController.getBrokerAddr();
}
@Override
- public AddressableMessageQueue buildAddressableMessageQueue(MessageQueue
messageQueue) throws Exception {
- String brokerAddress = getBrokerAddr(messageQueue.getBrokerName());
+ public AddressableMessageQueue buildAddressableMessageQueue(ProxyContext
ctx, MessageQueue messageQueue) throws Exception {
+ String brokerAddress = getBrokerAddr(ctx,
messageQueue.getBrokerName());
return new AddressableMessageQueue(messageQueue, brokerAddress);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index ba97e183b0..3fa6414c39 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -35,6 +35,7 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.AbstractCacheLoader;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -109,18 +110,18 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
this.appendStartAndShutdown(this.mqClientAPIFactory);
}
- public MessageQueueView getAllMessageQueueView(String topicName) throws
Exception {
+ public MessageQueueView getAllMessageQueueView(ProxyContext ctx, String
topicName) throws Exception {
return getCacheMessageQueueWrapper(this.topicCache, topicName);
}
- public abstract MessageQueueView getCurrentMessageQueueView(String
topicName) throws Exception;
+ public abstract MessageQueueView getCurrentMessageQueueView(ProxyContext
ctx, String topicName) throws Exception;
- public abstract ProxyTopicRouteData getTopicRouteForProxy(List<Address>
requestHostAndPortList,
+ public abstract ProxyTopicRouteData getTopicRouteForProxy(ProxyContext
ctx, List<Address> requestHostAndPortList,
String topicName) throws Exception;
- public abstract String getBrokerAddr(String brokerName) throws Exception;
+ public abstract String getBrokerAddr(ProxyContext ctx, String brokerName)
throws Exception;
- public abstract AddressableMessageQueue
buildAddressableMessageQueue(MessageQueue messageQueue) throws Exception;
+ public abstract AddressableMessageQueue
buildAddressableMessageQueue(ProxyContext ctx, MessageQueue messageQueue)
throws Exception;
protected static MessageQueueView
getCacheMessageQueueWrapper(LoadingCache<String, MessageQueueView> topicCache,
String key) throws Exception {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
index cc988db782..2ef8497374 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/sysmessage/AbstractSystemMessageSyncer.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.common.utils.StartAndShutdown;
@@ -95,7 +96,7 @@ public abstract class AbstractSystemMessageSyncer implements
StartAndShutdown, M
JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8)
);
- AddressableMessageQueue messageQueue =
this.topicRouteService.getAllMessageQueueView(targetTopic)
+ AddressableMessageQueue messageQueue =
this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
targetTopic)
.getWriteSelector().selectOne(true);
this.mqClientAPIFactory.getClient().sendMessageAsync(
messageQueue.getBrokerAddr(),
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
index 3254d711d7..f0e083adea 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionService.java
@@ -29,13 +29,13 @@ public abstract class AbstractTransactionService implements
TransactionService,
protected TransactionDataManager transactionDataManager = new
TransactionDataManager();
@Override
- public TransactionData addTransactionDataByBrokerAddr(String brokerAddr,
String producerGroup, long tranStateTableOffset, long commitLogOffset, String
transactionId,
+ public TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx,
String brokerAddr, String producerGroup, long tranStateTableOffset, long
commitLogOffset, String transactionId,
Message message) {
- return
this.addTransactionDataByBrokerName(this.getBrokerNameByAddr(brokerAddr),
producerGroup, tranStateTableOffset, commitLogOffset, transactionId, message);
+ return this.addTransactionDataByBrokerName(ctx,
this.getBrokerNameByAddr(brokerAddr), producerGroup, tranStateTableOffset,
commitLogOffset, transactionId, message);
}
@Override
- public TransactionData addTransactionDataByBrokerName(String brokerName,
String producerGroup, long tranStateTableOffset, long commitLogOffset, String
transactionId,
+ public TransactionData addTransactionDataByBrokerName(ProxyContext ctx,
String brokerName, String producerGroup, long tranStateTableOffset, long
commitLogOffset, String transactionId,
Message message) {
if (StringUtils.isBlank(brokerName)) {
return null;
@@ -55,7 +55,7 @@ public abstract class AbstractTransactionService implements
TransactionService,
}
@Override
- public EndTransactionRequestData genEndTransactionRequestHeader(String
producerGroup, Integer commitOrRollback,
+ public EndTransactionRequestData
genEndTransactionRequestHeader(ProxyContext ctx, String producerGroup, Integer
commitOrRollback,
boolean fromTransactionCheck, String msgId, String transactionId) {
TransactionData transactionData =
this.transactionDataManager.pollNoExpireTransactionData(producerGroup,
transactionId);
if (transactionData == null) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
index eff3ac4922..1ec4286463 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionService.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
@@ -67,20 +68,20 @@ public class ClusterTransactionService extends
AbstractTransactionService {
}
@Override
- public void addTransactionSubscription(String group, List<String>
topicList) {
+ public void addTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList) {
for (String topic : topicList) {
- addTransactionSubscription(group, topic);
+ addTransactionSubscription(ctx, group, topic);
}
}
@Override
- public void addTransactionSubscription(String group, String topic) {
+ public void addTransactionSubscription(ProxyContext ctx, String group,
String topic) {
try {
groupClusterData.compute(group, (groupName, clusterDataSet) -> {
if (clusterDataSet == null) {
clusterDataSet = Sets.newHashSet();
}
- clusterDataSet.addAll(getClusterDataFromTopic(topic));
+ clusterDataSet.addAll(getClusterDataFromTopic(ctx, topic));
return clusterDataSet;
});
} catch (Exception e) {
@@ -89,17 +90,17 @@ public class ClusterTransactionService extends
AbstractTransactionService {
}
@Override
- public void replaceTransactionSubscription(String group, List<String>
topicList) {
+ public void replaceTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList) {
Set<ClusterData> clusterDataSet = new HashSet<>();
for (String topic : topicList) {
- clusterDataSet.addAll(getClusterDataFromTopic(topic));
+ clusterDataSet.addAll(getClusterDataFromTopic(ctx, topic));
}
groupClusterData.put(group, clusterDataSet);
}
- private Set<ClusterData> getClusterDataFromTopic(String topic) {
+ private Set<ClusterData> getClusterDataFromTopic(ProxyContext ctx, String
topic) {
try {
- MessageQueueView messageQueue =
this.topicRouteService.getAllMessageQueueView(topic);
+ MessageQueueView messageQueue =
this.topicRouteService.getAllMessageQueueView(ctx, topic);
List<BrokerData> brokerDataList =
messageQueue.getTopicRouteData().getBrokerDatas();
if (brokerDataList == null) {
@@ -117,7 +118,7 @@ public class ClusterTransactionService extends
AbstractTransactionService {
}
@Override
- public void unSubscribeAllTransactionTopic(String group) {
+ public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group)
{
groupClusterData.remove(group);
}
@@ -195,7 +196,7 @@ public class ClusterTransactionService extends
AbstractTransactionService {
protected void sendHeartBeatToCluster(String clusterName, HeartbeatData
heartbeatData, Map<String, String> brokerAddrNameMap) {
try {
- MessageQueueView messageQueue =
this.topicRouteService.getAllMessageQueueView(clusterName);
+ MessageQueueView messageQueue =
this.topicRouteService.getAllMessageQueueView(ProxyContext.createForInner(this.getClass()),
clusterName);
List<BrokerData> brokerDataList =
messageQueue.getTopicRouteData().getBrokerDatas();
if (brokerDataList == null) {
return;
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
index 2371b25a24..4a27e4ff24 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/LocalTransactionService.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.proxy.service.transaction;
import java.util.List;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.proxy.common.ProxyContext;
/**
* no need to implements, because the channel of producer will put into the
broker's producerManager
@@ -31,22 +32,22 @@ public class LocalTransactionService extends
AbstractTransactionService {
}
@Override
- public void addTransactionSubscription(String group, List<String>
topicList) {
+ public void addTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList) {
}
@Override
- public void addTransactionSubscription(String group, String topic) {
+ public void addTransactionSubscription(ProxyContext ctx, String group,
String topic) {
}
@Override
- public void replaceTransactionSubscription(String group, List<String>
topicList) {
+ public void replaceTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList) {
}
@Override
- public void unSubscribeAllTransactionTopic(String group) {
+ public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group)
{
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
index 2a851051eb..a7ab353242 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/transaction/TransactionService.java
@@ -22,21 +22,21 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
public interface TransactionService {
- void addTransactionSubscription(String group, List<String> topicList);
+ void addTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList);
- void addTransactionSubscription(String group, String topic);
+ void addTransactionSubscription(ProxyContext ctx, String group, String
topic);
- void replaceTransactionSubscription(String group, List<String> topicList);
+ void replaceTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList);
- void unSubscribeAllTransactionTopic(String group);
+ void unSubscribeAllTransactionTopic(ProxyContext ctx, String group);
- TransactionData addTransactionDataByBrokerAddr(String brokerAddr, String
producerGroup, long tranStateTableOffset, long commitLogOffset, String
transactionId,
+ TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx, String
brokerAddr, String producerGroup, long tranStateTableOffset, long
commitLogOffset, String transactionId,
Message message);
- TransactionData addTransactionDataByBrokerName(String brokerName, String
producerGroup, long tranStateTableOffset, long commitLogOffset, String
transactionId,
+ TransactionData addTransactionDataByBrokerName(ProxyContext ctx, String
brokerName, String producerGroup, long tranStateTableOffset, long
commitLogOffset, String transactionId,
Message message);
- EndTransactionRequestData genEndTransactionRequestHeader(String
producerGroup, Integer commitOrRollback,
+ EndTransactionRequestData genEndTransactionRequestHeader(ProxyContext ctx,
String producerGroup, Integer commitOrRollback,
boolean fromTransactionCheck, String msgId, String transactionId);
void onSendCheckTransactionStateFailed(ProxyContext context, String
producerGroup, TransactionData transactionData);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
index 8431077020..a5d4e3c919 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/client/ClientActivityTest.java
@@ -134,7 +134,7 @@ public class ClientActivityTest extends BaseActivityTest {
txProducerTopicArgumentCaptor.capture()
);
-
when(this.metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.TRANSACTION);
+ when(this.metadataService.getTopicMessageType(any(),
anyString())).thenReturn(TopicMessageType.TRANSACTION);
HeartbeatResponse response = this.sendProducerHeartbeat(context);
@@ -222,7 +222,7 @@ public class ClientActivityTest extends BaseActivityTest {
.build());
ArgumentCaptor<ClientChannelInfo> channelInfoArgumentCaptor =
ArgumentCaptor.forClass(ClientChannelInfo.class);
doNothing().when(this.messagingProcessor).unRegisterProducer(any(),
anyString(), channelInfoArgumentCaptor.capture());
-
when(this.metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL);
+ when(this.metadataService.getTopicMessageType(any(),
anyString())).thenReturn(TopicMessageType.NORMAL);
this.sendProducerTelemetry(context);
this.sendProducerHeartbeat(context);
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
index a861e8c13f..fdd052da76 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ChangeInvisibleDurationActivityTest.java
@@ -92,7 +92,7 @@ public class ChangeInvisibleDurationActivityTest extends
BaseActivityTest {
when(this.messagingProcessor.changeInvisibleTime(
any(), receiptHandleCaptor.capture(), anyString(), anyString(),
anyString(), invisibleTimeArgumentCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(ackResult));
- when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(),
anyString(), anyString()))
+ when(receiptHandleProcessor.removeReceiptHandle(any(), any(),
anyString(), anyString(), anyString()))
.thenReturn(new MessageReceiptHandle("group", "topic", 0,
savedHandleStr, "msgId", 0, 0));
ChangeInvisibleDurationResponse response =
this.changeInvisibleDurationActivity.changeInvisibleDuration(
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
index 68db3020e3..ec620340c5 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/ForwardMessageToDLQActivityTest.java
@@ -75,7 +75,7 @@ public class ForwardMessageToDLQActivityTest extends
BaseActivityTest {
.thenReturn(CompletableFuture.completedFuture(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
"")));
String savedHandleStr = buildReceiptHandle("topic",
System.currentTimeMillis(),3000);
- when(receiptHandleProcessor.removeReceiptHandle(any(), anyString(),
anyString(), anyString()))
+ when(receiptHandleProcessor.removeReceiptHandle(any(), any(),
anyString(), anyString(), anyString()))
.thenReturn(new MessageReceiptHandle("group", "topic", 0,
savedHandleStr, "msgId", 0, 0));
ForwardMessageToDeadLetterQueueResponse response =
this.forwardMessageToDLQActivity.forwardMessageToDeadLetterQueue(
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
index ce98b7494d..a7ba69098b 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/route/RouteActivityTest.java
@@ -101,7 +101,7 @@ public class RouteActivityTest extends BaseActivityTest {
.thenReturn(createProxyTopicRouteData(2, 2, 6));
MetadataService metadataService =
Mockito.mock(LocalMetadataService.class);
when(this.messagingProcessor.getMetadataService()).thenReturn(metadataService);
-
when(metadataService.getTopicMessageType(anyString())).thenReturn(TopicMessageType.NORMAL);
+ when(metadataService.getTopicMessageType(any(),
anyString())).thenReturn(TopicMessageType.NORMAL);
QueryRouteResponse response = this.routeActivity.queryRoute(
createContext(),
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index c695eb0944..876b25b30b 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -38,6 +38,7 @@ import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -91,7 +92,7 @@ public class ConsumerProcessorTest extends BaseProcessorTest {
when(this.messageService.popMessage(any(),
messageQueueArgumentCaptor.capture(), requestHeaderArgumentCaptor.capture(),
anyLong()))
.thenReturn(CompletableFuture.completedFuture(innerPopResult));
- when(this.topicRouteService.getCurrentMessageQueueView(anyString()))
+ when(this.topicRouteService.getCurrentMessageQueueView(any(),
anyString()))
.thenReturn(mock(MessageQueueView.class));
ArgumentCaptor<String> ackMessageIdArgumentCaptor =
ArgumentCaptor.forClass(String.class);
@@ -191,12 +192,12 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
AddressableMessageQueue addressableMessageQueue2 = new
AddressableMessageQueue(mq2, "127.0.0.1");
mqSet.add(mq1);
mqSet.add(mq2);
-
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0],
"127.0.0.1"));
+ when(this.topicRouteService.buildAddressableMessageQueue(any(),
any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue)
i.getArguments()[1], "127.0.0.1"));
when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue1), any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue2), any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq2)));
- Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null,
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+ Set<MessageQueue> result =
this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet,
CONSUMER_GROUP, CLIENT_ID, 1000)
.get();
assertThat(result).isEqualTo(mqSet);
}
@@ -210,12 +211,12 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
AddressableMessageQueue addressableMessageQueue2 = new
AddressableMessageQueue(mq2, "127.0.0.1");
mqSet.add(mq1);
mqSet.add(mq2);
-
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0],
"127.0.0.1"));
+ when(this.topicRouteService.buildAddressableMessageQueue(any(),
any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue)
i.getArguments()[1], "127.0.0.1"));
when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue1), any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue2), any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet()));
- Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null,
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+ Set<MessageQueue> result =
this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet,
CONSUMER_GROUP, CLIENT_ID, 1000)
.get();
assertThat(result).isEqualTo(Sets.newHashSet(mq1));
}
@@ -229,14 +230,14 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
AddressableMessageQueue addressableMessageQueue2 = new
AddressableMessageQueue(mq2, "127.0.0.1");
mqSet.add(mq1);
mqSet.add(mq2);
-
when(this.topicRouteService.buildAddressableMessageQueue(any())).thenAnswer(i
-> new AddressableMessageQueue((MessageQueue) i.getArguments()[0],
"127.0.0.1"));
+ when(this.topicRouteService.buildAddressableMessageQueue(any(),
any())).thenAnswer(i -> new AddressableMessageQueue((MessageQueue)
i.getArguments()[1], "127.0.0.1"));
when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue1), any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(Sets.newHashSet(mq1)));
CompletableFuture<Set<MessageQueue>> future = new
CompletableFuture<>();
future.completeExceptionally(new MQBrokerException(1, "err"));
when(this.messageService.lockBatchMQ(any(),
eq(addressableMessageQueue2), any(), anyLong()))
.thenReturn(future);
- Set<MessageQueue> result = this.consumerProcessor.lockBatchMQ(null,
mqSet, CONSUMER_GROUP, CLIENT_ID, 1000)
+ Set<MessageQueue> result =
this.consumerProcessor.lockBatchMQ(ProxyContext.create(), mqSet,
CONSUMER_GROUP, CLIENT_ID, 1000)
.get();
assertThat(result).isEqualTo(Sets.newHashSet(mq1));
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
index 213e6a6beb..de63b7e75f 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ProducerProcessorTest.java
@@ -70,7 +70,7 @@ public class ProducerProcessorTest extends BaseProcessorTest {
@Test
public void testSendMessage() throws Throwable {
-
when(metadataService.getTopicMessageType(eq(TOPIC))).thenReturn(TopicMessageType.NORMAL);
+ when(metadataService.getTopicMessageType(any(),
eq(TOPIC))).thenReturn(TopicMessageType.NORMAL);
String txId = MessageClientIDSetter.createUniqID();
String msgId = MessageClientIDSetter.createUniqID();
long commitLogOffset = 1000L;
@@ -96,6 +96,7 @@ public class ProducerProcessorTest extends BaseProcessorTest {
ArgumentCaptor<Long> tranStateTableOffsetCaptor =
ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> commitLogOffsetCaptor =
ArgumentCaptor.forClass(Long.class);
when(transactionService.addTransactionDataByBrokerName(
+ any(),
brokerNameCaptor.capture(),
anyString(),
tranStateTableOffsetCaptor.capture(),
@@ -150,6 +151,7 @@ public class ProducerProcessorTest extends
BaseProcessorTest {
ArgumentCaptor<Long> tranStateTableOffsetCaptor =
ArgumentCaptor.forClass(Long.class);
ArgumentCaptor<Long> commitLogOffsetCaptor =
ArgumentCaptor.forClass(Long.class);
when(transactionService.addTransactionDataByBrokerName(
+ any(),
brokerNameCaptor.capture(),
anyString(),
tranStateTableOffsetCaptor.capture(),
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
index c0bff981f0..7206e6b791 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -107,8 +107,8 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testAddReceiptHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(new
SubscriptionGroupConfig());
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
@@ -120,9 +120,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
public void testRenewReceiptHandle() {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
long newInvisibleTime = 18000L;
@@ -167,7 +167,7 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new MQClientException(0,
"error"));
@@ -197,7 +197,7 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
public void testRenewWithInvalidHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
CompletableFuture<AckResult> ackResultFuture = new
CompletableFuture<>();
ackResultFuture.completeExceptionally(new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "error"));
@@ -221,7 +221,7 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
ProxyConfig config = ConfigurationManager.getProxyConfig();
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
AtomicInteger count = new AtomicInteger(0);
List<CompletableFuture<AckResult>> futureList = new ArrayList<>();
@@ -299,10 +299,10 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, newReceiptHandle, messageReceiptHandle);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(),
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong()))
.thenReturn(CompletableFuture.completedFuture(new AckResult()));
receiptHandleProcessor.scheduleRenewTask();
@@ -333,9 +333,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, newReceiptHandle, messageReceiptHandle);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(null);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(null);
Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(),
Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(),
Mockito.anyLong()))
.thenReturn(CompletableFuture.completedFuture(new AckResult()));
receiptHandleProcessor.scheduleRenewTask();
@@ -369,9 +369,9 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
messageReceiptHandle = new MessageReceiptHandle(GROUP, TOPIC,
QUEUE_ID, newReceiptHandle, MESSAGE_ID, OFFSET,
RECONSUME_TIMES);
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
newReceiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, newReceiptHandle, messageReceiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
Mockito.when(messagingProcessor.findConsumerChannel(Mockito.any(),
Mockito.eq(GROUP),
Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
@@ -382,10 +382,10 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testRemoveReceiptHandle() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
- receiptHandleProcessor.removeReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.removeReceiptHandle(PROXY_CONTEXT, channel,
GROUP, MSG_ID, receiptHandle);
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
.changeInvisibleTime(Mockito.any(ProxyContext.class),
Mockito.any(ReceiptHandle.class), Mockito.anyString(),
@@ -395,10 +395,10 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
@Test
public void testClearGroup() {
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
receiptHandleProcessor.clearGroup(new
ReceiptHandleProcessor.ReceiptHandleGroupKey(channel, GROUP));
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
-
Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.eq(GROUP))).thenReturn(groupConfig);
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(),
Mockito.eq(GROUP))).thenReturn(groupConfig);
receiptHandleProcessor.scheduleRenewTask();
Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
.changeInvisibleTime(Mockito.any(ProxyContext.class),
Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
@@ -410,7 +410,7 @@ public class ReceiptHandleProcessorTest extends
BaseProcessorTest {
ArgumentCaptor<ConsumerIdsChangeListener> listenerArgumentCaptor =
ArgumentCaptor.forClass(ConsumerIdsChangeListener.class);
Mockito.verify(messagingProcessor,
Mockito.times(1)).registerConsumerListener(listenerArgumentCaptor.capture());
Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
- receiptHandleProcessor.addReceiptHandle(channel, GROUP, MSG_ID,
receiptHandle, messageReceiptHandle);
+ receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, channel, GROUP,
MSG_ID, receiptHandle, messageReceiptHandle);
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER,
GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
assertTrue(receiptHandleProcessor.receiptHandleGroupMap.isEmpty());
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
index f9473b450e..6bffb15bd1 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/TransactionProcessorTest.java
@@ -54,7 +54,7 @@ public class TransactionProcessorTest extends
BaseProcessorTest {
protected void testEndTransaction(int sysFlag, TransactionStatus
transactionStatus) throws Throwable {
when(this.messageService.endTransactionOneway(any(), any(), any(),
anyLong())).thenReturn(CompletableFuture.completedFuture(null));
ArgumentCaptor<Integer> commitOrRollbackCaptor =
ArgumentCaptor.forClass(Integer.class);
- when(transactionService.genEndTransactionRequestHeader(anyString(),
commitOrRollbackCaptor.capture(), anyBoolean(), anyString(), anyString()))
+ when(transactionService.genEndTransactionRequestHeader(any(),
anyString(), commitOrRollbackCaptor.capture(), anyBoolean(), anyString(),
anyString()))
.thenReturn(new EndTransactionRequestData("brokerName", new
EndTransactionRequestHeader()));
this.transactionProcessor.endTransaction(
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
index b88f6677ef..9d897642fd 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java
@@ -76,7 +76,7 @@ public class SendMessageActivityTest extends InitConfigTest {
@Test
public void testSendMessage() throws Exception {
-
when(metadataServiceMock.getTopicMessageType(eq(topic))).thenReturn(TopicMessageType.NORMAL);
+ when(metadataServiceMock.getTopicMessageType(any(),
eq(topic))).thenReturn(TopicMessageType.NORMAL);
Message message = new Message(topic, "123".getBytes());
message.putUserProperty("a", "b");
SendMessageRequestHeader sendMessageRequestHeader = new
SendMessageRequestHeader();
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
index baecd47428..c97bd5a72b 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/BaseServiceTest.java
@@ -35,6 +35,7 @@ import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -76,8 +77,8 @@ public class BaseServiceTest extends InitConfigTest {
brokerData.setBrokerAddrs(brokerAddrs);
topicRouteData.setBrokerDatas(Lists.newArrayList(brokerData));
-
when(this.topicRouteService.getAllMessageQueueView(eq(ERR_TOPIC))).thenThrow(new
MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
-
when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new
MessageQueueView(TOPIC, topicRouteData));
-
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new
MessageQueueView(CLUSTER_NAME, topicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(ERR_TOPIC))).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST,
""));
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME,
topicRouteData));
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
index 734207f025..7e4d25f0c0 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/message/ClusterMessageServiceTest.java
@@ -32,6 +32,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -50,7 +51,7 @@ public class ClusterMessageServiceTest {
@Test
public void testAckMessageByInvalidBrokerNameHandle() throws Exception {
- when(topicRouteService.getBrokerAddr(anyString())).thenThrow(new
MQClientException(ResponseCode.TOPIC_NOT_EXIST, ""));
+ when(topicRouteService.getBrokerAddr(any(),
anyString())).thenThrow(new MQClientException(ResponseCode.TOPIC_NOT_EXIST,
""));
try {
this.clusterMessageService.ackMessage(
ProxyContext.create(),
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
index 50afbc48f8..98bf1104f8 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataServiceTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.metadata;
import java.util.HashMap;
import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
@@ -54,17 +55,19 @@ public class ClusterMetadataServiceTest extends
BaseServiceTest {
@Test
public void testGetTopicMessageType() {
- assertEquals(TopicMessageType.UNSPECIFIED,
this.clusterMetadataService.getTopicMessageType(ERR_TOPIC));
+ ProxyContext ctx = ProxyContext.create();
+ assertEquals(TopicMessageType.UNSPECIFIED,
this.clusterMetadataService.getTopicMessageType(ctx, ERR_TOPIC));
assertEquals(1,
this.clusterMetadataService.topicConfigCache.asMap().size());
- assertEquals(TopicMessageType.UNSPECIFIED,
this.clusterMetadataService.getTopicMessageType(ERR_TOPIC));
+ assertEquals(TopicMessageType.UNSPECIFIED,
this.clusterMetadataService.getTopicMessageType(ctx, ERR_TOPIC));
- assertEquals(TopicMessageType.NORMAL,
this.clusterMetadataService.getTopicMessageType(TOPIC));
+ assertEquals(TopicMessageType.NORMAL,
this.clusterMetadataService.getTopicMessageType(ctx, TOPIC));
assertEquals(2,
this.clusterMetadataService.topicConfigCache.asMap().size());
}
@Test
public void testGetSubscriptionGroupConfig() {
-
assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(GROUP));
+ ProxyContext ctx = ProxyContext.create();
+
assertNotNull(this.clusterMetadataService.getSubscriptionGroupConfig(ctx,
GROUP));
assertEquals(1,
this.clusterMetadataService.subscriptionGroupConfigCache.asMap().size());
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
index 9b27eb56c4..b5fc1b6713 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.assertj.core.util.Lists;
@@ -61,18 +62,20 @@ public class ClusterTopicRouteServiceTest extends
BaseServiceTest {
@Test
public void testGetCurrentMessageQueueView() throws Throwable {
- MQClientException exception = catchThrowableOfType(() ->
this.topicRouteService.getCurrentMessageQueueView(ERR_TOPIC),
MQClientException.class);
+ ProxyContext ctx = ProxyContext.create();
+ MQClientException exception = catchThrowableOfType(() ->
this.topicRouteService.getCurrentMessageQueueView(ctx, ERR_TOPIC),
MQClientException.class);
assertTrue(TopicRouteHelper.isTopicNotExistError(exception));
assertEquals(1, this.topicRouteService.topicCache.asMap().size());
-
assertNotNull(this.topicRouteService.getCurrentMessageQueueView(TOPIC));
+ assertNotNull(this.topicRouteService.getCurrentMessageQueueView(ctx,
TOPIC));
assertEquals(2, this.topicRouteService.topicCache.asMap().size());
}
@Test
public void testGetTopicRouteForProxy() throws Throwable {
+ ProxyContext ctx = ProxyContext.create();
List<Address> addressList = Lists.newArrayList(new
Address(Address.AddressScheme.IPv4, HostAndPort.fromParts("127.0.0.1", 8888)));
- ProxyTopicRouteData proxyTopicRouteData =
this.topicRouteService.getTopicRouteForProxy(addressList, TOPIC);
+ ProxyTopicRouteData proxyTopicRouteData =
this.topicRouteService.getTopicRouteForProxy(ctx, addressList, TOPIC);
assertEquals(1, proxyTopicRouteData.getBrokerDatas().size());
assertEquals(addressList,
proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID));
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
index 4948cddc2e..1ad39a1db6 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/LocalTopicRouteServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.proxy.common.Address;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
@@ -76,8 +77,9 @@ public class LocalTopicRouteServiceTest extends
BaseServiceTest {
@Test
public void testGetCurrentMessageQueueView() throws Throwable {
+ ProxyContext ctx = ProxyContext.create();
this.topicConfigTable.put(TOPIC, new TopicConfig(TOPIC, 3, 2,
PermName.PERM_WRITE | PermName.PERM_READ));
- MessageQueueView messageQueueView =
this.topicRouteService.getCurrentMessageQueueView(TOPIC);
+ MessageQueueView messageQueueView =
this.topicRouteService.getCurrentMessageQueueView(ctx, TOPIC);
assertEquals(3, messageQueueView.getReadSelector().getQueues().size());
assertEquals(2,
messageQueueView.getWriteSelector().getQueues().size());
assertEquals(1,
messageQueueView.getReadSelector().getBrokerActingQueues().size());
@@ -90,7 +92,8 @@ public class LocalTopicRouteServiceTest extends
BaseServiceTest {
@Test
public void testGetTopicRouteForProxy() throws Throwable {
- ProxyTopicRouteData proxyTopicRouteData =
this.topicRouteService.getTopicRouteForProxy(new ArrayList<>(), TOPIC);
+ ProxyContext ctx = ProxyContext.create();
+ ProxyTopicRouteData proxyTopicRouteData =
this.topicRouteService.getTopicRouteForProxy(ctx, new ArrayList<>(), TOPIC);
assertEquals(1, proxyTopicRouteData.getBrokerDatas().size());
assertEquals(
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
index 6540523598..6373aba308 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/sysmessage/HeartbeatSyncerTest.java
@@ -133,7 +133,7 @@ public class HeartbeatSyncerTest extends InitConfigTest {
brokerData.setBrokerAddrs(brokerAddr);
topicRouteData.getBrokerDatas().add(brokerData);
MessageQueueView messageQueueView = new MessageQueueView("foo",
topicRouteData);
-
when(this.topicRouteService.getAllMessageQueueView(anyString())).thenReturn(messageQueueView);
+ when(this.topicRouteService.getAllMessageQueueView(any(),
anyString())).thenReturn(messageQueueView);
}
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
index 6e4af2e8f6..81de5ec843 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/AbstractTransactionServiceTest.java
@@ -37,6 +37,7 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
private static final String BROKER_NAME = "mockBroker";
private static final String PRODUCER_GROUP = "producerGroup";
private static final Random RANDOM = new Random();
+ private final ProxyContext ctx =
ProxyContext.createForInner(this.getClass());
public static class MockAbstractTransactionServiceTest extends
AbstractTransactionService {
@@ -46,22 +47,22 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
}
@Override
- public void addTransactionSubscription(String group, List<String>
topicList) {
+ public void addTransactionSubscription(ProxyContext ctx, String group,
List<String> topicList) {
}
@Override
- public void addTransactionSubscription(String group, String topic) {
+ public void addTransactionSubscription(ProxyContext ctx, String group,
String topic) {
}
@Override
- public void replaceTransactionSubscription(String group, List<String>
topicList) {
+ public void replaceTransactionSubscription(ProxyContext ctx, String
group, List<String> topicList) {
}
@Override
- public void unSubscribeAllTransactionTopic(String group) {
+ public void unSubscribeAllTransactionTopic(ProxyContext ctx, String
group) {
}
}
@@ -81,6 +82,7 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
String txId = MessageClientIDSetter.createUniqID();
TransactionData transactionData =
transactionService.addTransactionDataByBrokerName(
+ ctx,
BROKER_NAME,
PRODUCER_GROUP,
RANDOM.nextLong(),
@@ -91,6 +93,7 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
assertNotNull(transactionData);
EndTransactionRequestData requestData =
transactionService.genEndTransactionRequestHeader(
+ ctx,
PRODUCER_GROUP,
MessageSysFlag.TRANSACTION_COMMIT_TYPE,
true,
@@ -104,6 +107,7 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
assertEquals(transactionData.getTranStateTableOffset(),
requestData.getRequestHeader().getTranStateTableOffset().longValue());
assertNull(transactionService.genEndTransactionRequestHeader(
+ ctx,
"group",
MessageSysFlag.TRANSACTION_COMMIT_TYPE,
true,
@@ -119,6 +123,7 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
String txId = MessageClientIDSetter.createUniqID();
TransactionData transactionData =
transactionService.addTransactionDataByBrokerName(
+ ctx,
BROKER_NAME,
PRODUCER_GROUP,
RANDOM.nextLong(),
@@ -128,6 +133,7 @@ public class AbstractTransactionServiceTest extends
InitConfigTest {
);
transactionService.onSendCheckTransactionStateFailed(ProxyContext.createForInner(this.getClass()),
PRODUCER_GROUP, transactionData);
assertNull(transactionService.genEndTransactionRequestHeader(
+ ctx,
PRODUCER_GROUP,
MessageSysFlag.TRANSACTION_COMMIT_TYPE,
true,
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
index 2b56839301..a0063544ec 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/transaction/ClusterTransactionServiceTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
@@ -44,6 +45,7 @@ import org.mockito.Mockito;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -53,7 +55,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
@Mock
private ProducerManager producerManager;
-
+ private ProxyContext ctx = ProxyContext.create();
private ClusterTransactionService clusterTransactionService;
@Before
@@ -63,7 +65,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
this.mqClientAPIFactory);
MessageQueueView messageQueueView = new MessageQueueView(TOPIC,
topicRouteData);
- when(this.topicRouteService.getAllMessageQueueView(anyString()))
+ when(this.topicRouteService.getAllMessageQueueView(any(), anyString()))
.thenReturn(messageQueueView);
when(mqClientAPIFactory.getClient()).thenReturn(mqClientAPIExt);
@@ -71,7 +73,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
@Test
public void testAddTransactionSubscription() {
- this.clusterTransactionService.addTransactionSubscription(GROUP,
TOPIC);
+ this.clusterTransactionService.addTransactionSubscription(ctx, GROUP,
TOPIC);
assertEquals(1,
this.clusterTransactionService.getGroupClusterData().size());
assertEquals(CLUSTER_NAME,
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
@@ -79,7 +81,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
@Test
public void testAddTransactionSubscriptionTopicList() {
- this.clusterTransactionService.addTransactionSubscription(GROUP,
Lists.newArrayList(TOPIC + 1, TOPIC + 2));
+ this.clusterTransactionService.addTransactionSubscription(ctx, GROUP,
Lists.newArrayList(TOPIC + 1, TOPIC + 2));
assertEquals(1,
this.clusterTransactionService.getGroupClusterData().size());
assertEquals(CLUSTER_NAME,
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
@@ -87,21 +89,21 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
@Test
public void testReplaceTransactionSubscription() {
- this.clusterTransactionService.addTransactionSubscription(GROUP,
TOPIC);
+ this.clusterTransactionService.addTransactionSubscription(ctx, GROUP,
TOPIC);
assertEquals(1,
this.clusterTransactionService.getGroupClusterData().size());
assertEquals(CLUSTER_NAME,
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
this.brokerData.setCluster(CLUSTER_NAME + 1);
- this.clusterTransactionService.replaceTransactionSubscription(GROUP,
Lists.newArrayList(TOPIC + 1));
+ this.clusterTransactionService.replaceTransactionSubscription(ctx,
GROUP, Lists.newArrayList(TOPIC + 1));
assertEquals(1,
this.clusterTransactionService.getGroupClusterData().size());
assertEquals(CLUSTER_NAME + 1,
this.clusterTransactionService.getGroupClusterData().get(GROUP).stream().findAny().get().getCluster());
}
@Test
public void testUnSubscribeAllTransactionTopic() {
- this.clusterTransactionService.addTransactionSubscription(GROUP,
TOPIC);
- this.clusterTransactionService.unSubscribeAllTransactionTopic(GROUP);
+ this.clusterTransactionService.addTransactionSubscription(ctx, GROUP,
TOPIC);
+ this.clusterTransactionService.unSubscribeAllTransactionTopic(ctx,
GROUP);
assertEquals(0,
this.clusterTransactionService.getGroupClusterData().size());
}
@@ -125,7 +127,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
brokerData.setBrokerAddrs(brokerAddrs);
topicRouteData.getQueueDatas().add(queueData);
topicRouteData.getBrokerDatas().add(brokerData);
-
when(this.topicRouteService.getAllMessageQueueView(eq(TOPIC))).thenReturn(new
MessageQueueView(TOPIC, topicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(TOPIC))).thenReturn(new MessageQueueView(TOPIC, topicRouteData));
TopicRouteData clusterTopicRouteData = new TopicRouteData();
QueueData clusterQueueData = new QueueData();
@@ -139,7 +141,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
brokerAddrs.put(MixAll.MASTER_ID, BROKER_ADDR);
clusterBrokerData.setBrokerAddrs(brokerAddrs);
clusterTopicRouteData.setBrokerDatas(Lists.newArrayList(clusterBrokerData));
-
when(this.topicRouteService.getAllMessageQueueView(eq(CLUSTER_NAME))).thenReturn(new
MessageQueueView(CLUSTER_NAME, clusterTopicRouteData));
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(CLUSTER_NAME))).thenReturn(new MessageQueueView(CLUSTER_NAME,
clusterTopicRouteData));
TopicRouteData clusterTopicRouteData2 = new TopicRouteData();
QueueData clusterQueueData2 = new QueueData();
@@ -153,7 +155,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
brokerAddrs.put(MixAll.MASTER_ID, brokerAddr2);
clusterBrokerData2.setBrokerAddrs(brokerAddrs);
clusterTopicRouteData2.setBrokerDatas(Lists.newArrayList(clusterBrokerData2));
-
when(this.topicRouteService.getAllMessageQueueView(eq(clusterName2))).thenReturn(new
MessageQueueView(clusterName2, clusterTopicRouteData2));
+ when(this.topicRouteService.getAllMessageQueueView(any(),
eq(clusterName2))).thenReturn(new MessageQueueView(clusterName2,
clusterTopicRouteData2));
ConfigurationManager.getProxyConfig().setTransactionHeartbeatBatchNum(2);
this.clusterTransactionService.start();
@@ -161,7 +163,7 @@ public class ClusterTransactionServiceTest extends
BaseServiceTest {
for (int i = 0; i < 3; i++) {
groupSet.add(GROUP + i);
- this.clusterTransactionService.addTransactionSubscription(GROUP +
i, TOPIC);
+ this.clusterTransactionService.addTransactionSubscription(ctx,
GROUP + i, TOPIC);
}
ArgumentCaptor<String> brokerAddrArgumentCaptor =
ArgumentCaptor.forClass(String.class);