This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7fc5452e0d [ISSUE #9970] Refactor the MessageQueueSelector to support 
more flexible queue selection strategy (#9971)
7fc5452e0d is described below

commit 7fc5452e0d9b4f5b86a7babc2ccad2c60cf0dac1
Author: qianye <[email protected]>
AuthorDate: Wed Jan 7 15:40:09 2026 +0800

    [ISSUE #9970] Refactor the MessageQueueSelector to support more flexible 
queue selection strategy (#9971)
---
 .../RocksdbGroupConfigTransferTest.java            |  52 +--
 .../rocketmq/client/latency/MQFaultStrategy.java   |   8 +-
 .../service/route/AddressableMessageQueue.java     |  49 +--
 .../route/DefaultMessageQueuePriorityProvider.java |  25 ++
 .../proxy/service/route/MessageQueuePenalizer.java | 134 ++++++
 .../route/MessageQueuePriorityProvider.java        |  84 ++++
 .../proxy/service/route/MessageQueueSelector.java  | 115 ++---
 .../proxy/service/route/MessageQueueView.java      |  22 +-
 .../proxy/service/route/TopicRouteService.java     |  64 +--
 .../grpc/v2/producer/SendMessageActivityTest.java  |   8 +-
 .../service/route/MessageQueuePenalizerTest.java   | 472 +++++++++++++++++++++
 .../route/MessageQueuePriorityProviderTest.java    | 311 ++++++++++++++
 .../service/route/MessageQueueSelectorTest.java    |   8 +-
 13 files changed, 1168 insertions(+), 184 deletions(-)

diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 4fbec13860..b476cb205e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -17,6 +17,15 @@
 
 package org.apache.rocketmq.broker.subscription;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -34,15 +43,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -78,24 +78,28 @@ public class RocksdbGroupConfigTransferTest {
         if (notToBeExecuted()) {
             return;
         }
-        Path pathToBeDeleted = Paths.get(basePath);
-
-        try {
-            Files.walk(pathToBeDeleted)
-                    .sorted(Comparator.reverseOrder())
-                    .forEach(path -> {
-                        try {
-                            Files.delete(path);
-                        } catch (IOException e) {
-                            // ignore
-                        }
-                    });
-        } catch (IOException e) {
-            // ignore
-        }
+
         if (rocksDBSubscriptionGroupManager != null) {
             rocksDBSubscriptionGroupManager.stop();
         }
+
+        Path root = Paths.get(basePath);
+        if (Files.notExists(root)) {
+            return;
+        }
+
+        try (Stream<Path> walk = Files.walk(root)) {
+            walk.sorted(Comparator.reverseOrder())
+                .forEach(p -> {
+                    try {
+                        Files.deleteIfExists(p);
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                });
+        } catch (IOException e) {
+                // ignore
+        }
     }
 
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java 
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index 69fb533e5a..76875378df 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -21,8 +21,9 @@ import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
 
-public class MQFaultStrategy {
+public class MQFaultStrategy implements StartAndShutdown {
     private LatencyFaultTolerance<String> latencyFaultTolerance;
     private volatile boolean sendLatencyFaultEnable;
     private volatile boolean startDetectorEnable;
@@ -130,6 +131,11 @@ public class MQFaultStrategy {
         this.latencyFaultTolerance.startDetector();
     }
 
+    @Override
+    public void start() throws Exception {
+        this.startDetector();
+    }
+
     public void shutdown() {
         this.latencyFaultTolerance.shutdown();
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
index ca877f3278..19f2c0db85 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java
@@ -17,22 +17,27 @@
 package org.apache.rocketmq.proxy.service.route;
 
 import com.google.common.base.MoreObjects;
-import java.util.Objects;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-public class AddressableMessageQueue implements 
Comparable<AddressableMessageQueue> {
-
-    private final MessageQueue messageQueue;
+public class AddressableMessageQueue extends MessageQueue {
     private final String brokerAddr;
 
     public AddressableMessageQueue(MessageQueue messageQueue, String 
brokerAddr) {
-        this.messageQueue = messageQueue;
+        super(messageQueue);
         this.brokerAddr = brokerAddr;
     }
 
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return new MessageQueue(getTopic(), getBrokerName(), getQueueId());
+    }
+
     @Override
-    public int compareTo(AddressableMessageQueue o) {
-        return messageQueue.compareTo(o.messageQueue);
+    public int hashCode() {
+        return super.hashCode();
     }
 
     @Override
@@ -43,39 +48,13 @@ public class AddressableMessageQueue implements 
Comparable<AddressableMessageQue
         if (!(o instanceof AddressableMessageQueue)) {
             return false;
         }
-        AddressableMessageQueue queue = (AddressableMessageQueue) o;
-        return Objects.equals(messageQueue, queue.messageQueue);
-    }
-
-    @Override
-    public int hashCode() {
-        return messageQueue == null ? 1 : messageQueue.hashCode();
-    }
-
-    public int getQueueId() {
-        return this.messageQueue.getQueueId();
-    }
-
-    public String getBrokerName() {
-        return this.messageQueue.getBrokerName();
-    }
-
-    public String getTopic() {
-        return messageQueue.getTopic();
-    }
-
-    public MessageQueue getMessageQueue() {
-        return messageQueue;
-    }
-
-    public String getBrokerAddr() {
-        return brokerAddr;
+        return super.equals(o);
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
-            .add("messageQueue", messageQueue)
+            .add("messageQueue", super.toString())
             .add("brokerAddr", brokerAddr)
             .toString();
     }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java
new file mode 100644
index 0000000000..90b0114f61
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+public class DefaultMessageQueuePriorityProvider implements 
MessageQueuePriorityProvider<AddressableMessageQueue> {
+    @Override
+    public int priorityOf(AddressableMessageQueue queue) {
+        return 0;
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java
new file mode 100644
index 0000000000..d53056971d
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+@FunctionalInterface
+public interface MessageQueuePenalizer<Q extends MessageQueue> {
+
+    /**
+     * Returns the penalty value for the given MessageQueue; lower is better.
+     */
+    int penaltyOf(Q messageQueue);
+
+    /**
+     * Aggregates penalties from multiple penalizers for the same MessageQueue 
(by summing them up).
+     */
+    static <Q extends MessageQueue> int evaluatePenalty(Q messageQueue, 
List<MessageQueuePenalizer<Q>> penalizers) {
+        Objects.requireNonNull(messageQueue, "messageQueue");
+        if (penalizers == null || penalizers.isEmpty()) {
+            return 0;
+        }
+        int sum = 0;
+        for (MessageQueuePenalizer<Q> p : penalizers) {
+            sum += p.penaltyOf(messageQueue);
+        }
+        return sum;
+    }
+
+    /**
+     * Selects the queue with the lowest evaluated penalty from the given 
queue list.
+     *
+     * <p>The method iterates through all queues exactly once, but starts from 
a rotating index
+     * derived from {@code startIndex} (round-robin) to avoid always scanning 
from position 0 .</p>
+     *
+     * <p>For each queue, it computes a penalty via {@link #evaluatePenalty} 
using
+     * the provided {@code penalizers}. The queue with the smallest penalty is 
selected.</p>
+     *
+     * <p>Short-circuit rule: if any queue has a {@code penalty<= 0}, it is 
returned immediately,
+     * since no better result than 0 is expected.</p>
+     *
+     * @param queues candidate queues to select from
+     * @param penalizers penalty evaluators applied to each queue
+     * @param startIndex atomic counter used to determine the rotating start 
position (round-robin)
+     * @param <Q> queue type
+     * @return a {@code Pair} of (selected queue, penalty), or {@code null} if 
{@code queues} is null/empty
+     */
+    static <Q extends MessageQueue> Pair<Q, Integer> 
selectLeastPenalty(List<Q> queues,
+        List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
+        if (queues == null || queues.isEmpty()) {
+            return null;
+        }
+        Q bestQueue = null;
+        int bestPenalty = Integer.MAX_VALUE;
+
+        for (int i = 0; i < queues.size(); i++) {
+            int index = Math.floorMod(startIndex.getAndIncrement(), 
queues.size());
+            Q messageQueue = queues.get(index);
+            int penalty = evaluatePenalty(messageQueue, penalizers);
+
+            // Short-circuit: cannot do better than 0
+            if (penalty <= 0) {
+                return Pair.of(messageQueue, penalty);
+            }
+
+            if (penalty < bestPenalty) {
+                bestPenalty = penalty;
+                bestQueue = messageQueue;
+            }
+        }
+        return Pair.of(bestQueue,  bestPenalty);
+    }
+
+    /**
+     * Selects a queue with the lowest computed penalty from multiple priority 
groups.
+     *
+     * <p>The input {@code queuesWithPriority} is a list of queue groups 
ordered by priority.
+     * For each priority group, this method delegates to {@link 
#selectLeastPenalty} to pick the best queue
+     * within that group and obtain its penalty.</p>
+     *
+     * <p>Short-circuit rule: if any priority group yields a queue whose 
{@code penalty <= 0},
+     * that result is returned immediately.</p>
+     *
+     * <p>Otherwise, it returns the queue with the smallest positive penalty 
among all groups.
+     * If multiple groups produce the same minimum penalty, the first 
encountered one wins.</p>
+     *
+     * @param queuesWithPriority priority-ordered groups of queues; each inner 
list represents one priority level
+     * @param penalizers penalty calculators used by {@code 
selectLeastPenalty} to score queues
+     * @param startIndex round-robin start index forwarded to {@code 
selectLeastPenalty} to reduce contention/hotspots
+     * @param <Q> queue type
+     * @return a {@code Pair} of (selected queue, penalty), or {@code null} if 
{@code queuesWithPriority} is null/empty
+     */
+    static <Q extends MessageQueue> Pair<Q, Integer> 
selectLeastPenaltyWithPriority(List<List<Q>> queuesWithPriority,
+        List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
+        if (queuesWithPriority == null || queuesWithPriority.isEmpty()) {
+            return null;
+        }
+        if (queuesWithPriority.size() == 1) {
+            return selectLeastPenalty(queuesWithPriority.get(0), penalizers, 
startIndex);
+        }
+        Q bestQueue = null;
+        int bestPenalty = Integer.MAX_VALUE;
+        for (List<Q> queues : queuesWithPriority) {
+            Pair<Q, Integer> queueAndPenalty = selectLeastPenalty(queues, 
penalizers, startIndex);
+            int penalty =  queueAndPenalty.getRight();
+            if (queueAndPenalty.getRight() <= 0) {
+                return queueAndPenalty;
+            }
+            if (penalty < bestPenalty) {
+                bestPenalty = penalty;
+                bestQueue = queueAndPenalty.getLeft();
+            }
+        }
+        return Pair.of(bestQueue,  bestPenalty);
+    }
+}
\ No newline at end of file
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
new file mode 100644
index 0000000000..57b6e65fe5
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * A functional interface for providing priority values for message queues.
+ * This interface allows custom priority determination logic to be applied to 
message queues,
+ * enabling queue selection and routing based on priority levels.
+ * <p>
+ * The priority value follows the convention that smaller numeric values 
indicate higher priority.
+ * For example, priority 0 is higher than priority 1.
+ * </p>
+ *
+ * @param <Q> the type of message queue, must extend {@link MessageQueue}
+ */
+@FunctionalInterface
+public interface MessageQueuePriorityProvider<Q extends MessageQueue> {
+
+    /**
+     * Determines the priority value of the given message queue.
+     * <p>
+     * Smaller values indicate higher priority. For example:
+     * <ul>
+     *   <li>Priority 0: Highest priority</li>
+     *   <li>Priority 1: Medium priority</li>
+     *   <li>Priority 2: Lower priority</li>
+     * </ul>
+     * </p>
+     *
+     * @param q the message queue to evaluate
+     * @return the priority value, where smaller values indicate higher 
priority
+     */
+    int priorityOf(Q q);
+
+    /**
+     * Groups message queues by their priority levels and returns them in 
priority order.
+     * <p>
+     * This static utility method takes a list of message queues and a 
priority provider,
+     * then organizes the queues into groups based on their priority values.
+     * The returned list is ordered from highest priority to lowest priority.
+     * </p>
+     *
+     * @param <Q>      the type of message queue, must extend {@link 
MessageQueue}
+     * @param queues   the list of message queues to group by priority, can be 
null or empty
+     * @param provider the priority provider to determine the priority of each 
queue
+     * @return a list of lists, where each inner list contains queues of the 
same priority level,
+     *         ordered from highest priority (smallest value) to lowest 
priority (largest value).
+     *         Returns an empty list if the input queues are null or empty.
+     */
+    static <Q extends MessageQueue> List<List<Q>> buildPriorityGroups(List<Q> 
queues, MessageQueuePriorityProvider<Q> provider) {
+        if (queues == null || queues.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Map<Integer, List<Q>> buckets = new TreeMap<>();
+        for (Q q : queues) {
+            int p = provider.priorityOf(q);
+            buckets.computeIfAbsent(p, k -> new ArrayList<>()).add(q);
+        }
+        return new ArrayList<>(buckets.values());
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
index f25fb907ef..0b028fa461 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.proxy.service.route;
 
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
 import com.google.common.math.IntMath;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -30,13 +29,16 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
-import org.apache.rocketmq.client.latency.MQFaultStrategy;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
 
+import static 
org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenaltyWithPriority;
+import static 
org.apache.rocketmq.proxy.service.route.MessageQueuePriorityProvider.buildPriorityGroups;
+
 public class MessageQueueSelector {
     private static final int BROKER_ACTING_QUEUE_ID = -1;
 
@@ -47,9 +49,18 @@ public class MessageQueueSelector {
     private final Map<String, AddressableMessageQueue> brokerNameQueueMap = 
new ConcurrentHashMap<>();
     private final AtomicInteger queueIndex;
     private final AtomicInteger brokerIndex;
-    private MQFaultStrategy mqFaultStrategy;
+    private final List<MessageQueuePenalizer<AddressableMessageQueue>> 
penalizers = new ArrayList<>();
+
+    // ordered by priority asc (smaller => higher priority)
+    private final List<List<AddressableMessageQueue>> queuesWithPriority;
+    private final List<List<AddressableMessageQueue>> 
brokerActingQueuesWithPriority;
+
+    public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean 
read) {
+        this(topicRouteWrapper, read, null);
+    }
 
-    public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, 
MQFaultStrategy mqFaultStrategy, boolean read) {
+    public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean 
read,
+        MessageQueuePriorityProvider<AddressableMessageQueue> 
priorityProvider) {
         if (read) {
             this.queues.addAll(buildRead(topicRouteWrapper));
         } else {
@@ -59,7 +70,12 @@ public class MessageQueueSelector {
         Random random = new Random();
         this.queueIndex = new AtomicInteger(random.nextInt());
         this.brokerIndex = new AtomicInteger(random.nextInt());
-        this.mqFaultStrategy = mqFaultStrategy;
+
+        if (priorityProvider == null) {
+            priorityProvider = new DefaultMessageQueuePriorityProvider();
+        }
+        this.queuesWithPriority = buildPriorityGroups(queues, 
priorityProvider);
+        this.brokerActingQueuesWithPriority = 
buildPriorityGroups(brokerActingQueues, priorityProvider);
     }
 
     private static List<AddressableMessageQueue> buildRead(TopicRouteWrapper 
topicRoute) {
@@ -138,7 +154,7 @@ public class MessageQueueSelector {
     private void buildBrokerActingQueues(String topic, 
List<AddressableMessageQueue> normalQueues) {
         for (AddressableMessageQueue mq : normalQueues) {
             AddressableMessageQueue brokerActingQueue = new 
AddressableMessageQueue(
-                new MessageQueue(topic, mq.getMessageQueue().getBrokerName(), 
BROKER_ACTING_QUEUE_ID),
+                new MessageQueue(topic, mq.getBrokerName(), 
BROKER_ACTING_QUEUE_ID),
                 mq.getBrokerAddr());
 
             if (!brokerActingQueues.contains(brokerActingQueue)) {
@@ -160,38 +176,15 @@ public class MessageQueueSelector {
     }
 
     public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) {
-        if (mqFaultStrategy != null && 
mqFaultStrategy.isSendLatencyFaultEnable()) {
-            List<MessageQueue> messageQueueList = null;
-            MessageQueue messageQueue = null;
+        if (CollectionUtils.isNotEmpty(penalizers)) {
+            Pair<AddressableMessageQueue, Integer> queueAndPenalty;
             if (onlyBroker) {
-                messageQueueList = 
transferAddressableQueues(brokerActingQueues);
+                queueAndPenalty = 
selectLeastPenaltyWithPriority(brokerActingQueuesWithPriority, penalizers, 
brokerIndex);
             } else {
-                messageQueueList = transferAddressableQueues(queues);
+                queueAndPenalty = 
selectLeastPenaltyWithPriority(queuesWithPriority, penalizers, queueIndex);
             }
-            AddressableMessageQueue addressableMessageQueue = null;
-
-            // use both available filter.
-            messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker 
? brokerIndex : queueIndex,
-                    mqFaultStrategy.getAvailableFilter(), 
mqFaultStrategy.getReachableFilter());
-            addressableMessageQueue = transferQueue2Addressable(messageQueue);
-            if (addressableMessageQueue != null) {
-                return addressableMessageQueue;
-            }
-
-            // use available filter.
-            messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker 
? brokerIndex : queueIndex,
-                    mqFaultStrategy.getAvailableFilter());
-            addressableMessageQueue = transferQueue2Addressable(messageQueue);
-            if (addressableMessageQueue != null) {
-                return addressableMessageQueue;
-            }
-
-            // no available filter, then use reachable filter.
-            messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker 
? brokerIndex : queueIndex,
-                    mqFaultStrategy.getReachableFilter());
-            addressableMessageQueue = transferQueue2Addressable(messageQueue);
-            if (addressableMessageQueue != null) {
-                return addressableMessageQueue;
+            if (queueAndPenalty != null && queueAndPenalty.getLeft() != null) {
+                return queueAndPenalty.getLeft();
             }
         }
 
@@ -199,46 +192,6 @@ public class MessageQueueSelector {
         return selectOne(onlyBroker);
     }
 
-    private MessageQueue selectOneMessageQueue(List<MessageQueue> 
messageQueueList, AtomicInteger sendQueue, 
TopicPublishInfo.QueueFilter...filter) {
-        if (messageQueueList == null || messageQueueList.isEmpty()) {
-            return null;
-        }
-        if (filter != null && filter.length != 0) {
-            for (int i = 0; i < messageQueueList.size(); i++) {
-                int index = Math.abs(sendQueue.incrementAndGet() % 
messageQueueList.size());
-                MessageQueue mq = messageQueueList.get(index);
-                boolean filterResult = true;
-                for (TopicPublishInfo.QueueFilter f: filter) {
-                    Preconditions.checkNotNull(f);
-                    filterResult &= f.filter(mq);
-                }
-                if (filterResult) {
-                    return mq;
-                }
-            }
-        }
-        return null;
-    }
-
-    public List<MessageQueue> 
transferAddressableQueues(List<AddressableMessageQueue> 
addressableMessageQueueList) {
-        if (addressableMessageQueueList == null) {
-            return null;
-        }
-
-        return addressableMessageQueueList.stream()
-                .map(AddressableMessageQueue::getMessageQueue)
-                .collect(Collectors.toList());
-    }
-
-    private AddressableMessageQueue transferQueue2Addressable(MessageQueue 
messageQueue) {
-        for (AddressableMessageQueue amq: queues) {
-            if (amq.getMessageQueue().equals(messageQueue)) {
-                return amq;
-            }
-        }
-        return null;
-    }
-
     public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) 
{
         boolean onlyBroker = last.getQueueId() < 0;
         AddressableMessageQueue newOne = last;
@@ -275,12 +228,10 @@ public class MessageQueueSelector {
         return brokerActingQueues;
     }
 
-    public MQFaultStrategy getMQFaultStrategy() {
-        return mqFaultStrategy;
-    }
-
-    public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
-        this.mqFaultStrategy = mqFaultStrategy;
+    public void addPenalizer(MessageQueuePenalizer<AddressableMessageQueue> 
penalizer) {
+        if (penalizer != null) {
+            this.penalizers.add(penalizer);
+        }
     }
 
     @Override
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index 898e529f8c..a0d768d6da 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -17,7 +17,8 @@
 package org.apache.rocketmq.proxy.service.route;
 
 import com.google.common.base.MoreObjects;
-import org.apache.rocketmq.client.latency.MQFaultStrategy;
+import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
 
 public class MessageQueueView {
@@ -27,11 +28,24 @@ public class MessageQueueView {
     private final MessageQueueSelector writeSelector;
     private final TopicRouteWrapper topicRouteWrapper;
 
-    public MessageQueueView(String topic, TopicRouteData topicRouteData, 
MQFaultStrategy mqFaultStrategy) {
+
+    public MessageQueueView(String topic, TopicRouteData topicRouteData, 
List<MessageQueuePenalizer<AddressableMessageQueue>> penalizer) {
+        this(topic, topicRouteData, penalizer, null);
+    }
+
+    public MessageQueueView(String topic, TopicRouteData topicRouteData, 
List<MessageQueuePenalizer<AddressableMessageQueue>> penalizer,
+        MessageQueuePriorityProvider<AddressableMessageQueue> 
priorityProvider) {
         this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
 
-        this.readSelector = new MessageQueueSelector(topicRouteWrapper, 
mqFaultStrategy, true);
-        this.writeSelector = new MessageQueueSelector(topicRouteWrapper, 
mqFaultStrategy, false);
+        this.readSelector = new MessageQueueSelector(topicRouteWrapper, true, 
priorityProvider);
+        this.writeSelector = new MessageQueueSelector(topicRouteWrapper, 
false, priorityProvider);
+
+        if (CollectionUtils.isNotEmpty(penalizer)) {
+            for (MessageQueuePenalizer<AddressableMessageQueue> p : penalizer) 
{
+                this.readSelector.addPenalizer(p);
+                this.writeSelector.addPenalizer(p);
+            }
+        }
     }
 
     public TopicRouteData getTopicRouteData() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index bcdf8140bc..dae3005746 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -19,11 +19,11 @@ package org.apache.rocketmq.proxy.service.route;
 import com.github.benmanes.caffeine.cache.CacheLoader;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-
+import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.ClientConfig;
@@ -32,12 +32,10 @@ import 
org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.client.latency.MQFaultStrategy;
 import org.apache.rocketmq.client.latency.Resolver;
 import org.apache.rocketmq.client.latency.ServiceDetector;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
 import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
-import org.apache.rocketmq.common.utils.ThreadUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.proxy.common.Address;
@@ -53,19 +51,15 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 public abstract class TopicRouteService extends AbstractStartAndShutdown {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
 
-    private final MQClientAPIFactory mqClientAPIFactory;
-    private MQFaultStrategy mqFaultStrategy;
-
+    private final MQFaultStrategy mqFaultStrategy;
     protected final LoadingCache<String /* topicName */, MessageQueueView> 
topicCache;
-    protected final ScheduledExecutorService scheduledExecutorService;
     protected final ThreadPoolExecutor cacheRefreshExecutor;
+    protected final List<MessageQueuePenalizer<AddressableMessageQueue>> 
penalizers = new ArrayList<>();
+    protected MessageQueuePriorityProvider<AddressableMessageQueue> 
priorityProvider = new DefaultMessageQueuePriorityProvider();
 
     public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
 
-        this.scheduledExecutorService = 
ThreadUtils.newSingleThreadScheduledExecutor(
-            new ThreadFactoryImpl("TopicRouteService_")
-        );
         this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
             config.getTopicRouteServiceThreadPoolNums(),
             config.getTopicRouteServiceThreadPoolNums(),
@@ -74,7 +68,6 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
             "TopicRouteCacheRefresh",
             config.getTopicRouteServiceThreadPoolQueueCapacity()
         );
-        this.mqClientAPIFactory = mqClientAPIFactory;
 
         this.topicCache = 
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum())
             
.expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), 
TimeUnit.SECONDS)
@@ -134,6 +127,8 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
                 }
             }
         }, serviceDetector);
+
+        
this.penalizers.addAll(buildPenalizerByMQFaultStrategy(mqFaultStrategy));
         this.init();
     }
 
@@ -146,22 +141,7 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
     }
 
     protected void init() {
-        this.appendShutdown(this.scheduledExecutorService::shutdown);
-        this.appendStartAndShutdown(this.mqClientAPIFactory);
-    }
-
-    @Override
-    public void shutdown() throws Exception {
-        if (this.mqFaultStrategy.isStartDetectorEnable()) {
-            mqFaultStrategy.shutdown();
-        }
-    }
-
-    @Override
-    public void start() throws Exception {
-        if (this.mqFaultStrategy.isStartDetectorEnable()) {
-            this.mqFaultStrategy.startDetector();
-        }
+        this.appendStartAndShutdown(this.mqFaultStrategy);
     }
 
     public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig 
proxyConfig) {
@@ -220,10 +200,36 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
 
     protected MessageQueueView buildMessageQueueView(String topic, 
TopicRouteData topicRouteData) {
         if (isTopicRouteValid(topicRouteData)) {
-            MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, 
TopicRouteService.this.getMqFaultStrategy());
+            MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, 
this.penalizers, this.priorityProvider);
             log.debug("load topic route from namesrv. topic: {}, queue: {}", 
topic, tmp);
             return tmp;
         }
         return MessageQueueView.WRAPPED_EMPTY_QUEUE;
     }
+
+    public void 
setPriorityProvider(MessageQueuePriorityProvider<AddressableMessageQueue> 
priorityProvider) {
+        this.priorityProvider = priorityProvider;
+    }
+
+    public void addPenalizer(MessageQueuePenalizer<AddressableMessageQueue> 
penalizer) {
+        this.penalizers.add(penalizer);
+    }
+
+    @VisibleForTesting
+    public static List<MessageQueuePenalizer<AddressableMessageQueue>> 
buildPenalizerByMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
+        List<MessageQueuePenalizer<AddressableMessageQueue>> penalizers = new 
ArrayList<>();
+        penalizers.add(messageQueue -> {
+            if (!mqFaultStrategy.isSendLatencyFaultEnable() || 
mqFaultStrategy.getAvailableFilter().filter(messageQueue)) {
+                return 0;
+            }
+            return 10;
+        });
+        penalizers.add(messageQueue -> {
+            if (!mqFaultStrategy.isSendLatencyFaultEnable() || 
mqFaultStrategy.getReachableFilter().filter(messageQueue)) {
+                return 0;
+            }
+            return 100;
+        });
+        return penalizers;
+    }
 }
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
index a64867ddfe..870aa0424f 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java
@@ -59,6 +59,7 @@ import org.assertj.core.util.Lists;
 import org.junit.Before;
 import org.junit.Test;
 
+import static 
org.apache.rocketmq.proxy.service.route.TopicRouteService.buildPenalizerByMQFaultStrategy;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThrows;
@@ -379,7 +380,7 @@ public class SendMessageActivityTest extends 
BaseActivityTest {
         MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class);
         
when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
         when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false);
-        MessageQueueView messageQueueView = new MessageQueueView(TOPIC, 
topicRouteData, topicRouteService.getMqFaultStrategy());
+        MessageQueueView messageQueueView = new MessageQueueView(TOPIC, 
topicRouteData, null);
 
         AddressableMessageQueue firstSelect = 
selector.select(ProxyContext.create(), messageQueueView);
         AddressableMessageQueue secondSelect = 
selector.select(ProxyContext.create(), messageQueueView);
@@ -415,10 +416,7 @@ public class SendMessageActivityTest extends 
BaseActivityTest {
         mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true);
         mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false);
 
-        TopicRouteService topicRouteService = mock(TopicRouteService.class);
-        
when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy);
-        MessageQueueView messageQueueView = new MessageQueueView(TOPIC, 
topicRouteData, topicRouteService.getMqFaultStrategy());
-
+        MessageQueueView messageQueueView = new MessageQueueView(TOPIC, 
topicRouteData, buildPenalizerByMQFaultStrategy(mqFaultStrategy));
 
         AddressableMessageQueue firstSelect = 
selector.select(ProxyContext.create(), messageQueueView);
         assertEquals(firstSelect.getBrokerName(), BROKER_NAME2);
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java
new file mode 100644
index 0000000000..f31d973cce
--- /dev/null
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MessageQueuePenalizerTest {
+
+    /**
+     * Test evaluatePenalty with null messageQueue should throw 
NullPointerException
+     */
+    @Test(expected = NullPointerException.class)
+    public void testEvaluatePenalty_NullMessageQueue() {
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = new 
ArrayList<>();
+        penalizers.add(mq -> 10);
+        MessageQueuePenalizer.evaluatePenalty(null, penalizers);
+    }
+
+    /**
+     * Test evaluatePenalty with null penalizers should return 0
+     */
+    @Test
+    public void testEvaluatePenalty_NullPenalizers() {
+        MessageQueue mq = new MessageQueue("topic", "broker", 0);
+        int penalty = MessageQueuePenalizer.evaluatePenalty(mq, null);
+        assertEquals(0, penalty);
+    }
+
+    /**
+     * Test evaluatePenalty with empty penalizers should return 0
+     */
+    @Test
+    public void testEvaluatePenalty_EmptyPenalizers() {
+        MessageQueue mq = new MessageQueue("topic", "broker", 0);
+        int penalty = MessageQueuePenalizer.evaluatePenalty(mq, 
Collections.emptyList());
+        assertEquals(0, penalty);
+    }
+
+    /**
+     * Test evaluatePenalty aggregates penalties from multiple penalizers by 
summing them up
+     */
+    @Test
+    public void testEvaluatePenalty_MultiplePenalizers() {
+        MessageQueue mq = new MessageQueue("topic", "broker", 0);
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = Arrays.asList(
+            q -> 10,
+            q -> 20,
+            q -> 5
+        );
+        int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers);
+        assertEquals(35, penalty);
+    }
+
+    /**
+     * Test evaluatePenalty with negative penalties (sum should still work)
+     */
+    @Test
+    public void testEvaluatePenalty_NegativePenalties() {
+        MessageQueue mq = new MessageQueue("topic", "broker", 0);
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = Arrays.asList(
+            q -> -5,
+            q -> 10,
+            q -> -3
+        );
+        int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers);
+        assertEquals(2, penalty);
+    }
+
+    /**
+     * Test selectLeastPenalty with null queues should return null
+     */
+    @Test
+    public void testSelectLeastPenalty_NullQueues() {
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 10);
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenalty(null, penalizers, startIndex);
+        assertNull(result);
+    }
+
+    /**
+     * Test selectLeastPenalty with empty queues should return null
+     */
+    @Test
+    public void testSelectLeastPenalty_EmptyQueues() {
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 10);
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenalty(
+            Collections.emptyList(), penalizers, startIndex);
+        assertNull(result);
+    }
+
+    /**
+     * Test selectLeastPenalty selects the queue with the lowest penalty
+     */
+    @Test
+    public void testSelectLeastPenalty_LowestPenalty() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+        // Penalizer that assigns different penalties based on queue id
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 10 : 30)
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+        assertNotNull(result);
+        assertEquals(mq1, result.getLeft());
+        assertEquals(10, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenalty short-circuits when penalty <= 0
+     */
+    @Test
+    public void testSelectLeastPenalty_ShortCircuitZeroPenalty() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+        // mq1 has penalty 0, should short-circuit
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 0 : 30)
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+        assertNotNull(result);
+        assertEquals(mq1, result.getLeft());
+        assertEquals(0, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenalty short-circuits when penalty is negative
+     */
+    @Test
+    public void testSelectLeastPenalty_ShortCircuitNegativePenalty() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+        // mq1 has penalty -5, should short-circuit
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? -5 : 30)
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+        assertNotNull(result);
+        assertEquals(mq1, result.getLeft());
+        assertEquals(-5, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenalty with round-robin behavior (rotating start index)
+     * Verifies that startIndex affects the iteration order
+     */
+    @Test
+    public void testSelectLeastPenalty_RoundRobinStartIndex() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+        // All queues have penalty 0, so whichever is encountered first will 
be returned
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 0);
+
+        // Starting from index 0
+        AtomicInteger startIndex1 = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result1 = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex1);
+        assertNotNull(result1);
+        assertEquals(mq0, result1.getLeft());
+
+        // Starting from index 1
+        AtomicInteger startIndex2 = new AtomicInteger(1);
+        Pair<MessageQueue, Integer> result2 = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex2);
+        assertNotNull(result2);
+        assertEquals(mq1, result2.getLeft());
+
+        // Starting from index 2
+        AtomicInteger startIndex3 = new AtomicInteger(2);
+        Pair<MessageQueue, Integer> result3 = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex3);
+        assertNotNull(result3);
+        assertEquals(mq2, result3.getLeft());
+    }
+
+    /**
+     * Test selectLeastPenalty increments startIndex for each iteration
+     */
+    @Test
+    public void testSelectLeastPenalty_IncrementStartIndex() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 10);
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, 
startIndex);
+
+        // After iterating through 3 queues, startIndex should be incremented 
3 times
+        assertEquals(3, startIndex.get());
+    }
+
+    /**
+     * Test selectLeastPenalty handles startIndex wrapping with Math.floorMod
+     */
+    @Test
+    public void testSelectLeastPenalty_StartIndexWrapping() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        MessageQueue mq2 = new MessageQueue("topic", "broker", 2);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1, mq2);
+
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 0);
+
+        // Start with large index to test wrapping
+        AtomicInteger startIndex = new AtomicInteger(100);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex);
+
+        assertNotNull(result);
+        // 100 % 3 = 1, so should start from mq1
+        assertEquals(mq1, result.getLeft());
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority with null queuesWithPriority should 
return null
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_NullQueues() {
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 10);
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            null, penalizers, startIndex);
+        assertNull(result);
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority with empty queuesWithPriority 
should return null
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_EmptyQueues() {
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 10);
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            Collections.emptyList(), penalizers, startIndex);
+        assertNull(result);
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority with single priority group 
delegates to selectLeastPenalty
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_SinglePriorityGroup() {
+        MessageQueue mq0 = new MessageQueue("topic", "broker", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker", 1);
+        List<MessageQueue> queues = Arrays.asList(mq0, mq1);
+
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getQueueId() == 0 ? 20 : 10
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            Collections.singletonList(queues), penalizers, startIndex);
+
+        assertNotNull(result);
+        assertEquals(mq1, result.getLeft());
+        assertEquals(10, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority selects queue with lowest penalty 
across multiple priority groups
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_MultiplePriorityGroups() {
+        // Priority group 1 (higher priority)
+        MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker-high", 1);
+        List<MessageQueue> highPriorityQueues = Arrays.asList(mq0, mq1);
+
+        // Priority group 2 (lower priority)
+        MessageQueue mq2 = new MessageQueue("topic", "broker-low", 0);
+        MessageQueue mq3 = new MessageQueue("topic", "broker-low", 1);
+        List<MessageQueue> lowPriorityQueues = Arrays.asList(mq2, mq3);
+
+        List<List<MessageQueue>> queuesWithPriority = 
Arrays.asList(highPriorityQueues, lowPriorityQueues);
+
+        // Assign penalties: high-priority queues have higher penalties, 
low-priority have lower
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getBrokerName().equals("broker-high") ? 50 : 10
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            queuesWithPriority, penalizers, startIndex);
+
+        assertNotNull(result);
+        // Should select from low-priority group because it has lower penalty
+        assertTrue(result.getLeft().getBrokerName().equals("broker-low"));
+        assertEquals(10, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority short-circuits when a priority 
group yields penalty <= 0
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_ShortCircuitZeroPenalty() {
+        // Priority group 1
+        MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0);
+        List<MessageQueue> highPriorityQueues = Collections.singletonList(mq0);
+
+        // Priority group 2
+        MessageQueue mq1 = new MessageQueue("topic", "broker-low", 0);
+        List<MessageQueue> lowPriorityQueues = Collections.singletonList(mq1);
+
+        List<List<MessageQueue>> queuesWithPriority = 
Arrays.asList(highPriorityQueues, lowPriorityQueues);
+
+        // First group has penalty 0, should short-circuit
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getBrokerName().equals("broker-high") ? 0 : 100
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            queuesWithPriority, penalizers, startIndex);
+
+        assertNotNull(result);
+        assertEquals(mq0, result.getLeft());
+        assertEquals(0, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority when first group encounters zero 
penalty during iteration
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_FirstGroupHasZeroPenalty() {
+        // Priority group 1
+        MessageQueue mq0 = new MessageQueue("topic", "broker1", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker1", 1);
+        List<MessageQueue> group1 = Arrays.asList(mq0, mq1);
+
+        // Priority group 2
+        MessageQueue mq2 = new MessageQueue("topic", "broker2", 0);
+        List<MessageQueue> group2 = Collections.singletonList(mq2);
+
+        List<List<MessageQueue>> queuesWithPriority = Arrays.asList(group1, 
group2);
+
+        // mq1 in first group has penalty 0
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(
+            mq -> mq.getQueueId() == 1 && mq.getBrokerName().equals("broker1") 
? 0 : 50
+        );
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            queuesWithPriority, penalizers, startIndex);
+
+        assertNotNull(result);
+        assertEquals(mq1, result.getLeft());
+        assertEquals(0, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority returns first encountered minimum 
when multiple groups have same minimum penalty
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_SameMinimumPenalty() {
+        // Priority group 1
+        MessageQueue mq0 = new MessageQueue("topic", "broker1", 0);
+        List<MessageQueue> group1 = Collections.singletonList(mq0);
+
+        // Priority group 2
+        MessageQueue mq1 = new MessageQueue("topic", "broker2", 0);
+        List<MessageQueue> group2 = Collections.singletonList(mq1);
+
+        // Priority group 3
+        MessageQueue mq2 = new MessageQueue("topic", "broker3", 0);
+        List<MessageQueue> group3 = Collections.singletonList(mq2);
+
+        List<List<MessageQueue>> queuesWithPriority = Arrays.asList(group1, 
group2, group3);
+
+        // All have same penalty
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> 10);
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            queuesWithPriority, penalizers, startIndex);
+
+        assertNotNull(result);
+        // Should return first encountered (from group1)
+        assertEquals(mq0, result.getLeft());
+        assertEquals(10, result.getRight().intValue());
+    }
+
+    /**
+     * Test selectLeastPenaltyWithPriority with complex scenario:
+     * Multiple priority groups with varying penalties
+     */
+    @Test
+    public void testSelectLeastPenaltyWithPriority_ComplexScenario() {
+        // Priority group 1: penalties 100, 90
+        MessageQueue mq0 = new MessageQueue("topic", "broker1", 0);
+        MessageQueue mq1 = new MessageQueue("topic", "broker1", 1);
+        List<MessageQueue> group1 = Arrays.asList(mq0, mq1);
+
+        // Priority group 2: penalties 50, 30
+        MessageQueue mq2 = new MessageQueue("topic", "broker2", 0);
+        MessageQueue mq3 = new MessageQueue("topic", "broker2", 1);
+        List<MessageQueue> group2 = Arrays.asList(mq2, mq3);
+
+        // Priority group 3: penalties 80, 20
+        MessageQueue mq4 = new MessageQueue("topic", "broker3", 0);
+        MessageQueue mq5 = new MessageQueue("topic", "broker3", 1);
+        List<MessageQueue> group3 = Arrays.asList(mq4, mq5);
+
+        List<List<MessageQueue>> queuesWithPriority = Arrays.asList(group1, 
group2, group3);
+
+        List<MessageQueuePenalizer<MessageQueue>> penalizers = 
Collections.singletonList(mq -> {
+            if (mq.getBrokerName().equals("broker1")) {
+                return mq.getQueueId() == 0 ? 100 : 90;
+            } else if (mq.getBrokerName().equals("broker2")) {
+                return mq.getQueueId() == 0 ? 50 : 30;
+            } else {
+                return mq.getQueueId() == 0 ? 80 : 20;
+            }
+        });
+
+        AtomicInteger startIndex = new AtomicInteger(0);
+        Pair<MessageQueue, Integer> result = 
MessageQueuePenalizer.selectLeastPenaltyWithPriority(
+            queuesWithPriority, penalizers, startIndex);
+
+        assertNotNull(result);
+        // Should select mq5 from group3 with penalty 20 (the global minimum)
+        assertEquals(mq5, result.getLeft());
+        assertEquals(20, result.getRight().intValue());
+    }
+}
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java
new file mode 100644
index 0000000000..22f2a68e8b
--- /dev/null
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class MessageQueuePriorityProviderTest {
+
+    @Test
+    public void testPriorityOfWithLambda() {
+        // Test functional interface implementation using lambda
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 
mq.getQueueId();
+        
+        MessageQueue queue1 = new MessageQueue("topic", "broker", 0);
+        MessageQueue queue2 = new MessageQueue("topic", "broker", 5);
+        MessageQueue queue3 = new MessageQueue("topic", "broker", 10);
+        
+        assertEquals(0, provider.priorityOf(queue1));
+        assertEquals(5, provider.priorityOf(queue2));
+        assertEquals(10, provider.priorityOf(queue3));
+    }
+
+    @Test
+    public void testPriorityOfWithConstantValue() {
+        // Test with constant priority
+        MessageQueuePriorityProvider<MessageQueue> constantProvider = mq -> 1;
+        
+        MessageQueue queue1 = new MessageQueue("topic1", "broker1", 0);
+        MessageQueue queue2 = new MessageQueue("topic2", "broker2", 5);
+        
+        assertEquals(1, constantProvider.priorityOf(queue1));
+        assertEquals(1, constantProvider.priorityOf(queue2));
+    }
+
+    @Test
+    public void testPriorityOfBasedOnBrokerName() {
+        // Test priority based on broker name hash
+        MessageQueuePriorityProvider<MessageQueue> brokerProvider = 
+            mq -> mq.getBrokerName().hashCode() % 10;
+        
+        MessageQueue queue1 = new MessageQueue("topic", "broker-a", 0);
+        MessageQueue queue2 = new MessageQueue("topic", "broker-b", 0);
+        
+        int priority1 = brokerProvider.priorityOf(queue1);
+        int priority2 = brokerProvider.priorityOf(queue2);
+        
+        // Priorities should be deterministic for the same broker
+        assertEquals(priority1, brokerProvider.priorityOf(queue1));
+        assertEquals(priority2, brokerProvider.priorityOf(queue2));
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithNullList() {
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(null, provider);
+        
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithEmptyList() {
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(
+            Collections.emptyList(), provider);
+        
+        assertNotNull(result);
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithSinglePriority() {
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+        
+        List<MessageQueue> queues = Arrays.asList(
+            new MessageQueue("topic", "broker1", 0),
+            new MessageQueue("topic", "broker1", 1),
+            new MessageQueue("topic", "broker1", 2)
+        );
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        assertEquals(3, result.get(0).size());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithMultiplePriorities() {
+        // Priority based on queue ID: 0->high, 1->medium, 2->low
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> {
+            if (mq.getQueueId() < 2) return 0; // High priority
+            if (mq.getQueueId() < 4) return 1; // Medium priority
+            return 2; // Low priority
+        };
+        
+        List<MessageQueue> queues = Arrays.asList(
+            new MessageQueue("topic", "broker", 0), // priority 0
+            new MessageQueue("topic", "broker", 1), // priority 0
+            new MessageQueue("topic", "broker", 2), // priority 1
+            new MessageQueue("topic", "broker", 3), // priority 1
+            new MessageQueue("topic", "broker", 4), // priority 2
+            new MessageQueue("topic", "broker", 5)  // priority 2
+        );
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(3, result.size());
+        
+        // First group (highest priority 0)
+        assertEquals(2, result.get(0).size());
+        assertEquals(0, result.get(0).get(0).getQueueId());
+        assertEquals(1, result.get(0).get(1).getQueueId());
+        
+        // Second group (medium priority 1)
+        assertEquals(2, result.get(1).size());
+        assertEquals(2, result.get(1).get(0).getQueueId());
+        assertEquals(3, result.get(1).get(1).getQueueId());
+        
+        // Third group (low priority 2)
+        assertEquals(2, result.get(2).size());
+        assertEquals(4, result.get(2).get(0).getQueueId());
+        assertEquals(5, result.get(2).get(1).getQueueId());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsOrderedByPriority() {
+        // Test that groups are ordered from high to low priority (ascending 
numeric value)
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 
mq.getQueueId();
+        
+        List<MessageQueue> queues = Arrays.asList(
+            new MessageQueue("topic", "broker", 5),
+            new MessageQueue("topic", "broker", 0),
+            new MessageQueue("topic", "broker", 3),
+            new MessageQueue("topic", "broker", 1)
+        );
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(4, result.size());
+        
+        // Verify order: 0, 1, 3, 5 (ascending)
+        assertEquals(0, result.get(0).get(0).getQueueId());
+        assertEquals(1, result.get(1).get(0).getQueueId());
+        assertEquals(3, result.get(2).get(0).getQueueId());
+        assertEquals(5, result.get(3).get(0).getQueueId());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithNegativePriorities() {
+        // Test with negative priority values
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 
mq.getQueueId() - 5;
+        
+        List<MessageQueue> queues = Arrays.asList(
+            new MessageQueue("topic", "broker", 0), // priority -5
+            new MessageQueue("topic", "broker", 5), // priority 0
+            new MessageQueue("topic", "broker", 10) // priority 5
+        );
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(3, result.size());
+        
+        // Verify order: -5, 0, 5 (ascending)
+        assertEquals(0, result.get(0).get(0).getQueueId());
+        assertEquals(5, result.get(1).get(0).getQueueId());
+        assertEquals(10, result.get(2).get(0).getQueueId());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithMixedBrokers() {
+        // Priority based on broker name
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> {
+            if (mq.getBrokerName().equals("broker-high")) return 0;
+            if (mq.getBrokerName().equals("broker-medium")) return 1;
+            return 2;
+        };
+        
+        List<MessageQueue> queues = Arrays.asList(
+            new MessageQueue("topic", "broker-high", 0),
+            new MessageQueue("topic", "broker-low", 0),
+            new MessageQueue("topic", "broker-medium", 0),
+            new MessageQueue("topic", "broker-high", 1),
+            new MessageQueue("topic", "broker-medium", 1)
+        );
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(3, result.size());
+        
+        // High priority group
+        assertEquals(2, result.get(0).size());
+        assertEquals("broker-high", result.get(0).get(0).getBrokerName());
+        assertEquals("broker-high", result.get(0).get(1).getBrokerName());
+        
+        // Medium priority group
+        assertEquals(2, result.get(1).size());
+        assertEquals("broker-medium", result.get(1).get(0).getBrokerName());
+        
+        // Low priority group
+        assertEquals(1, result.get(2).size());
+        assertEquals("broker-low", result.get(2).get(0).getBrokerName());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsPreservesQueueOrder() {
+        // Test that queues with same priority maintain their relative order
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 0;
+        
+        List<MessageQueue> queues = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            queues.add(new MessageQueue("topic", "broker", i));
+        }
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(1, result.size());
+        assertEquals(10, result.get(0).size());
+        
+        // Verify order is maintained
+        for (int i = 0; i < 10; i++) {
+            assertEquals(i, result.get(0).get(i).getQueueId());
+        }
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithCustomMessageQueue() {
+        // Test with extended MessageQueue type
+        class CustomMessageQueue extends MessageQueue {
+            private int customPriority;
+            
+            public CustomMessageQueue(String topic, String brokerName, int 
queueId, int customPriority) {
+                super(topic, brokerName, queueId);
+                this.customPriority = customPriority;
+            }
+            
+            public int getCustomPriority() {
+                return customPriority;
+            }
+        }
+        
+        MessageQueuePriorityProvider<CustomMessageQueue> provider = 
+            CustomMessageQueue::getCustomPriority;
+        
+        List<CustomMessageQueue> queues = Arrays.asList(
+            new CustomMessageQueue("topic", "broker", 0, 2),
+            new CustomMessageQueue("topic", "broker", 1, 0),
+            new CustomMessageQueue("topic", "broker", 2, 1)
+        );
+        
+        List<List<CustomMessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(3, result.size());
+        
+        // Verify order by custom priority: 0, 1, 2
+        assertEquals(0, result.get(0).get(0).getCustomPriority());
+        assertEquals(1, result.get(1).get(0).getCustomPriority());
+        assertEquals(2, result.get(2).get(0).getCustomPriority());
+    }
+
+    @Test
+    public void testBuildPriorityGroupsWithLargeNumberOfQueues() {
+        // Test with large number of queues
+        MessageQueuePriorityProvider<MessageQueue> provider = mq -> 
mq.getQueueId() % 5;
+        
+        List<MessageQueue> queues = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            queues.add(new MessageQueue("topic", "broker", i));
+        }
+        
+        List<List<MessageQueue>> result = 
MessageQueuePriorityProvider.buildPriorityGroups(queues, provider);
+        
+        assertNotNull(result);
+        assertEquals(5, result.size()); // 5 different priorities (0-4)
+        
+        // Each group should have 20 queues (100 / 5)
+        for (List<MessageQueue> group : result) {
+            assertEquals(20, group.size());
+        }
+    }
+}
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
index d150f87c40..e44ed28f4a 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java
@@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends 
BaseServiceTest {
     public void testReadMessageQueue() {
         queueData.setPerm(PermName.PERM_READ);
         queueData.setReadQueueNums(0);
-        MessageQueueSelector messageQueueSelector = new 
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true);
+        MessageQueueSelector messageQueueSelector = new 
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true);
         assertTrue(messageQueueSelector.getQueues().isEmpty());
 
         queueData.setPerm(PermName.PERM_READ);
         queueData.setReadQueueNums(3);
-        messageQueueSelector = new MessageQueueSelector(new 
TopicRouteWrapper(topicRouteData, TOPIC), null, true);
+        messageQueueSelector = new MessageQueueSelector(new 
TopicRouteWrapper(topicRouteData, TOPIC), true);
         assertEquals(3, messageQueueSelector.getQueues().size());
         assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
         for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {
@@ -58,12 +58,12 @@ public class MessageQueueSelectorTest extends 
BaseServiceTest {
     public void testWriteMessageQueue() {
         queueData.setPerm(PermName.PERM_WRITE);
         queueData.setReadQueueNums(0);
-        MessageQueueSelector messageQueueSelector = new 
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false);
+        MessageQueueSelector messageQueueSelector = new 
MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false);
         assertTrue(messageQueueSelector.getQueues().isEmpty());
 
         queueData.setPerm(PermName.PERM_WRITE);
         queueData.setWriteQueueNums(3);
-        messageQueueSelector = new MessageQueueSelector(new 
TopicRouteWrapper(topicRouteData, TOPIC), null, false);
+        messageQueueSelector = new MessageQueueSelector(new 
TopicRouteWrapper(topicRouteData, TOPIC), false);
         assertEquals(3, messageQueueSelector.getQueues().size());
         assertEquals(1, messageQueueSelector.getBrokerActingQueues().size());
         for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {

Reply via email to