This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new 8d2c6613be [ISSUE #7713] MQFaultStrategy check queue if writable
8d2c6613be is described below
commit 8d2c6613be0500b1a9289a9d8db43d2d12642a74
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Thu Jan 25 14:54:07 2024 +0800
[ISSUE #7713] MQFaultStrategy check queue if writable
---
.../apache/rocketmq/client/impl/producer/TopicPublishInfo.java | 3 ++-
.../org/apache/rocketmq/client/latency/MQFaultStrategy.java | 10 +++++++---
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 2f8337edef..22d4eb9234 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.producer;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.common.ThreadLocalIndex;
+import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -95,7 +96,7 @@ public class TopicPublishInfo {
public int getQueueIdByBroker(final String brokerName) {
for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
final QueueData queueData =
this.topicRouteData.getQueueDatas().get(i);
- if (queueData.getBrokerName().equals(brokerName)) {
+ if (PermName.isWriteable(queueData.getPerm()) &&
queueData.getBrokerName().equals(brokerName)) {
return queueData.getWriteQueueNums();
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index ea3d07e6d0..b6915cda07 100644
---
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -73,10 +73,14 @@ public class MQFaultStrategy {
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
-
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
+ MessageQueue selectedMessageQueue = new MessageQueue();
+ selectedMessageQueue.setTopic(mq.getTopic());
+ selectedMessageQueue.setBrokerName(notBestBroker);
+
selectedMessageQueue.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() %
writeQueueNums);
+ return selectedMessageQueue;
+ } else {
+ return mq;
}
- return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}