This is an automated email from the ASF dual-hosted git repository. dongeforever pushed a commit to branch 5.0.0-alpha-static-topic in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 08618d512ef1c3e711c65447571eb4ba7962a744 Author: dongeforever <[email protected]> AuthorDate: Tue Nov 9 19:53:53 2021 +0800 Finish the findBrokerAddr for admin publish subscribe --- .../consumer/store/RemoteBrokerOffsetStore.java | 8 +++--- .../impl/consumer/DefaultMQPullConsumerImpl.java | 6 ++++- .../impl/consumer/DefaultMQPushConsumerImpl.java | 29 +++++++++++++++++----- .../client/impl/consumer/PullAPIWrapper.java | 8 +++--- .../client/impl/consumer/RebalanceImpl.java | 7 +++--- .../impl/producer/DefaultMQProducerImpl.java | 6 ++--- 6 files changed, 43 insertions(+), 21 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java index 6b76238..7364856 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java @@ -199,10 +199,10 @@ public class RemoteBrokerOffsetStore implements OffsetStore { @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); } if (findBrokerResult != null) { @@ -226,11 +226,11 @@ public class RemoteBrokerOffsetStore implements OffsetStore { private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); + findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq)); } if (findBrokerResult != null) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index d04b040..89ce7db 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -578,7 +578,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { - String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) + String destBrokerName = brokerName; + if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(destBrokerName)) { + destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPullConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId()))); + } + String brokerAddr = (null != destBrokerName) ? this.mQClientFactory.findBrokerAddressInPublish(destBrokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); if (UtilAll.isBlank(consumerGroup)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index dafa555..d00c4e8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -725,6 +725,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { desBrokerName = tmpBrokerName; } } + if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(desBrokerName)) { + desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(msg.getTopic(), msg.getBrokerName(), msg.getQueueId()))); + } + String brokerAddr = null; if (null != desBrokerName) { brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(desBrokerName); @@ -765,15 +769,21 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { long queueOffset = ExtraInfoUtil.getQueueOffset(extraInfoStrs); String topic = message.getTopic(); + String desBrokerName = brokerName; + if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) { + desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId))); + } + + FindBrokerResult - findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true); } if (findBrokerResult == null) { - log.error("The broker[" + brokerName + "] not exist"); + log.error("The broker[" + desBrokerName + "] not exist"); return; } @@ -806,11 +816,17 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo); String brokerName = ExtraInfoUtil.getBrokerName(extraInfoStrs); int queueId = ExtraInfoUtil.getQueueId(extraInfoStrs); + + String desBrokerName = brokerName; + if (MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME.equals(brokerName)) { + desBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(this.defaultMQPushConsumer.queueWithNamespace(new MessageQueue(topic, brokerName, queueId))); + } + FindBrokerResult - findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); - findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(desBrokerName, MixAll.MASTER_ID, true); } if (findBrokerResult != null) { ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader(); @@ -820,10 +836,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { requestHeader.setConsumerGroup(consumerGroup); requestHeader.setExtraInfo(extraInfo); requestHeader.setInvisibleTime(invisibleTime); + //here the broker should be polished this.mQClientFactory.getMQClientAPIImpl().changeInvisibleTimeAsync(brokerName, findBrokerResult.getBrokerAddr(), requestHeader, ASYNC_TIMEOUT, callback); return; } - throw new MQClientException("The broker[" + brokerName + "] not exist", null); + throw new MQClientException("The broker[" + desBrokerName + "] not exist", null); } public int getMaxReconsumeTimes() { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index a379b1c..6d966a6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -239,12 +239,12 @@ public class PullAPIWrapper { int queueId = mq.getQueueId(); FindBrokerResult findBrokerResult = - this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), + this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); findBrokerResult = - this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), + this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), this.recalculatePullFromWhichNode(mq), false); } @@ -373,10 +373,10 @@ public class PullAPIWrapper { public void popAsync(MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup, long timeout, PopCallback popCallback, boolean poll, int initMode, boolean order, String expressionType, String expression) throws MQClientException, RemotingException, InterruptedException { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true); if (null == findBrokerResult) { this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); - findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); + findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true); } if (findBrokerResult != null) { PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index ab0d885..5788d1c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@ -76,7 +76,7 @@ public abstract class RebalanceImpl { } public void unlock(final MessageQueue mq, final boolean oneway) { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true); if (findBrokerResult != null) { UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); @@ -141,7 +141,8 @@ public abstract class RebalanceImpl { continue; } - Set<MessageQueue> mqs = result.get(mq.getBrokerName()); + String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); + Set<MessageQueue> mqs = result.get(destBrokerName); if (null == mqs) { mqs = new HashSet<MessageQueue>(); result.put(mq.getBrokerName(), mqs); @@ -154,7 +155,7 @@ public abstract class RebalanceImpl { } public boolean lock(final MessageQueue mq) { - FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true); + FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true); if (findBrokerResult != null) { LockBatchRequestBody requestBody = new LockBatchRequestBody(); requestBody.setConsumerGroup(this.consumerGroup); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 52a2d9c..7719597 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -721,11 +721,11 @@ public class DefaultMQProducerImpl implements MQProducerInner { final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); - String brokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); - String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); + String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(mq); + String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); - brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(brokerName); + brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName); } SendMessageContext context = null;
