msqfwj opened a new issue, #8578:
URL: https://github.com/apache/rocketmq/issues/8578
### Before Creating the Enhancement Request
- [X] I have confirmed that this should be classified as an enhancement
rather than a bug/feature.
### Summary
当手动指定QueueId来生产消息时候,会出现各队列消息负载不均衡的情况
### Motivation
如果出现某一个队列负载过重的情况,会导致一个消费者组,其他消费者都怠工。而这个队列对应的消费者会消费不及时。
### Describe the Solution You'd Like
我已经设计了一个新的负载均衡算法,在broker端缓存所有采用新AllocateMessageQueueStrategy实现类对应主题以及消费者组。
定期统计他们有多少个队列,把每个队列剩余未消费消息数作为权重使用优先级队列进行分配。
下面为优先级队列分配队列的算法实现
` public Map<String, List<MessageQueue>> distributeItems(List<String>
cidAll, Map<Integer, Long> countMap, List<MessageQueue> queues) {
// 使用优先队列存储每个对象和他们的当前总分数
PriorityQueue<Consumer> pq = new PriorityQueue<>((a, b) -> (int)
(a.score - b.score));
for (String cid : cidAll) {
pq.add(new Consumer(cid));
}
// 对物品分数进行降序排序
List<Map.Entry<Integer, Long>> list = new
ArrayList<>(countMap.entrySet());
list.sort((a, b) -> (int) (b.getValue() - a.getValue()));
System.out.println("总分数为" + list);
// 分配物品
for (Map.Entry<Integer, Long> entry : list) {
// 获取当前总分数最小的对象
Consumer current = pq.poll();
// 更新总分数
current.score += entry.getValue();
// 记录分配的queue
for (MessageQueue queue : queues) {
if (queue.getQueueId() == entry.getKey()) {
current.queues.add(queue);
break;
}
}
// 将对象放回优先队列
pq.add(current);
}
// 构建结果
Map<String, List<MessageQueue>> result = new HashMap<>();
while (!pq.isEmpty()) {
Consumer current = pq.poll();
result.put(current.cid, current.queues);
System.out.println(current.cid + " 分配了" + current.score + " 队列 "
+
current.queues.stream().map(MessageQueue::getQueueId).map(String::valueOf).collect(Collectors.joining(",")));
}
return result;
}`
当每次消费端负载均衡请求到达broker返回该缓存结果。同时会对该缓存读写时候增加读写锁保护,当缓存尚未初始化的时候。默认返回默认负载均衡算法的计算结果。
测试时候是能保证每个cid能分配到相近平均数的未来预期需要消费的消费数的。

### Describe Alternatives You've Considered
该种实现方案有如下弊端
1实现比较耦合,业务和mq本身贴合了
2 在计算时期可能会出现不同消费者分配到相同队列的情况,需要消费端做好幂等
3
因为依赖broker的大量方法,例如获取消息数等等。所以该实现类必须放在broker包中而不是其他AllocateMessageQueueStrategy实现类所在模块。
还有一种实现方式,在client端请求负载均衡时候,不在缓存,直接实时计算返回。
但是这样可能导致哪怕没有消费者变动。也会出现频繁的变更负责的队列。
### Additional Context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]