[
https://issues.apache.org/jira/browse/ROCKETMQ-203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16032895#comment-16032895
]
ASF GitHub Bot commented on ROCKETMQ-203:
-----------------------------------------
Github user lizhanhui commented on a diff in the pull request:
https://github.com/apache/incubator-rocketmq/pull/109#discussion_r119598779
--- Diff:
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * An allocate strategy proxy for based on machine room nearside priority.
An actual allocate strategy can be
+ * specified.
+ *
+ * If any consumer is alive in a machine room, the message queue of the
broker which is deployed in the same machine
+ * should only be allocated to those. Otherwise, those message queues can
be shared along all consumers since there are
+ * no alive consumer to monopolize them.
+ */
+public class AllocateMachineRoomNearby implements
AllocateMessageQueueStrategy {
+ private final Logger log = ClientLogger.getLog();
+
+ private final AllocateMessageQueueStrategy
allocateMessageQueueStrategy;//actual allocate strategy
+ private final MachineRoomSelector machineRoomSelector;
+
+ public AllocateMachineRoomNearby(AllocateMessageQueueStrategy
allocateMessageQueueStrategy,
+ MachineRoomSelector machineRoomSelector) {
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ this.machineRoomSelector = machineRoomSelector;
+ }
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
+ List<String> cidAll) {
+ if (currentCID == null || currentCID.length() < 1) {
+ throw new IllegalArgumentException("currentCID is empty");
+ }
+ if (mqAll == null || mqAll.isEmpty()) {
+ throw new IllegalArgumentException("mqAll is null or mqAll
empty");
+ }
+ if (cidAll == null || cidAll.isEmpty()) {
+ throw new IllegalArgumentException("cidAll is null or cidAll
empty");
+ }
+
+ List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!cidAll.contains(currentCID)) {
+ log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in
cidAll: {}",
+ consumerGroup,
+ currentCID,
+ cidAll);
+ return result;
+ }
+
+ //group mq by machine room
+ Map<String, List<MessageQueue>> mr2Mq = new TreeMap<String,
List<MessageQueue>>();
+ for (MessageQueue mq : mqAll) {
+ String brokerMachineRoom =
machineRoomSelector.brokerDeployIn(mq);
+ if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
+ if (mr2Mq.get(brokerMachineRoom) == null) {
+ mr2Mq.put(brokerMachineRoom, new
ArrayList<MessageQueue>());
+ }
+ mr2Mq.get(brokerMachineRoom).add(mq);
+ } else {
+ throw new IllegalArgumentException("Machine room is null
for mq " + mq);
+ }
+ }
+
+ //group consumer by machine room
+ Map<String, List<String>> mr2c = new TreeMap<String,
List<String>>();
+ for (String cid : cidAll) {
+ String consumerMachineRoom =
machineRoomSelector.consumerDeployIn(cid);
+ if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
+ if (mr2c.get(consumerMachineRoom) == null) {
+ mr2c.put(consumerMachineRoom, new ArrayList<String>());
+ }
+ mr2c.get(consumerMachineRoom).add(cid);
+ } else {
+ throw new IllegalArgumentException("Machine room is null
for consumer id " + cid);
+ }
+ }
+
+ List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
+
+ //1.allocate the mq that deploy in the same machine room with the
current consumer
+ String currentMachineRoom =
machineRoomSelector.consumerDeployIn(currentCID);
+ List<MessageQueue> mqInThisMachineRoom =
mr2Mq.remove(currentMachineRoom);
+ List<String> consumerInThisMachineRoom =
mr2c.get(currentMachineRoom);
+ if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty())
{
+
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup,
currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
+ }
+
+ //2.allocate the rest mq to each machine room if there are no
consumer alive in that machine room
+ for (String machineRoom : mr2Mq.keySet()) {
+ if (!mr2c.containsKey(machineRoom)) { // no alive consumer in
the corresponding machine room, so all consumers share these queues
+
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup,
currentCID, mr2Mq.get(machineRoom), cidAll));
+ }
+ }
+
+ return allocateResults;
+ }
+
+ @Override
+ public String getName() {
+ return "MACHINE_ROOM_NEARBY" + "-" +
allocateMessageQueueStrategy.getName();
+ }
+
+ public interface MachineRoomSelector {
--- End diff --
How about rename this interface as `MachineRoomResolver`?
> Support client to allocate message queue in machine room nearby priority
> ------------------------------------------------------------------------
>
> Key: ROCKETMQ-203
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-203
> Project: Apache RocketMQ
> Issue Type: New Feature
> Reporter: Jaskey Lam
> Assignee: vongosling
> Attachments: AVLCB%1XMJ}T_YXP[OPF){T.png
>
>
> As the picture shows, in multi-machine room acrtiture , the consumer may be
> deployed in different IDC and so do the broker.
> But the user may possiblly need to consumer message from its own IDC since it
> is near and faster. But when problem occurs say all consumers in one IDC is
> crashed, the messages can also be consumed by other consumers in other IDC.
> This is a very important feature for many users, but currently, if the user
> needs this feature, they must implement their own allocate strategy which is
> not easy and error prone.
> And, this feature should be be conflict to the existing allocate strategy
> that means the user can still choose their better allocate strategy say
> AVERAGE or AVERAGE_BY_CIRCLE, but nearby machine room will be supported
> easily too.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)