[ 
https://issues.apache.org/jira/browse/ROCKETMQ-203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16033319#comment-16033319
 ] 

ASF GitHub Bot commented on ROCKETMQ-203:
-----------------------------------------

Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/109#discussion_r119676071
  
    --- Diff: 
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
 ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.rocketmq.client.consumer.rebalance;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
    +import org.apache.rocketmq.client.log.ClientLogger;
    +import org.apache.rocketmq.common.message.MessageQueue;
    +import org.slf4j.Logger;
    +
    +/**
    + * An allocate strategy proxy for based on machine room nearside priority. 
An actual allocate strategy can be
    + * specified.
    + *
    + * If any consumer is alive in a machine room, the message queue of the 
broker which is deployed in the same machine
    + * should only be allocated to those. Otherwise, those message queues can 
be shared along all consumers since there are
    + * no alive consumer to monopolize them.
    + */
    +public class AllocateMachineRoomNearby implements 
AllocateMessageQueueStrategy {
    +    private final Logger log = ClientLogger.getLog();
    +
    +    private final AllocateMessageQueueStrategy 
allocateMessageQueueStrategy;//actual allocate strategy
    +    private final MachineRoomSelector machineRoomSelector;
    +
    +    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy 
allocateMessageQueueStrategy,
    +        MachineRoomSelector machineRoomSelector) {
    +        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    +        this.machineRoomSelector = machineRoomSelector;
    +    }
    +
    +    @Override
    +    public List<MessageQueue> allocate(String consumerGroup, String 
currentCID, List<MessageQueue> mqAll,
    +        List<String> cidAll) {
    +        if (currentCID == null || currentCID.length() < 1) {
    +            throw new IllegalArgumentException("currentCID is empty");
    +        }
    +        if (mqAll == null || mqAll.isEmpty()) {
    +            throw new IllegalArgumentException("mqAll is null or mqAll 
empty");
    +        }
    +        if (cidAll == null || cidAll.isEmpty()) {
    +            throw new IllegalArgumentException("cidAll is null or cidAll 
empty");
    +        }
    +
    +        List<MessageQueue> result = new ArrayList<MessageQueue>();
    +        if (!cidAll.contains(currentCID)) {
    +            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in 
cidAll: {}",
    +                consumerGroup,
    +                currentCID,
    +                cidAll);
    +            return result;
    +        }
    +
    +        //group mq by machine room
    +        Map<String, List<MessageQueue>> mr2Mq = new TreeMap<String, 
List<MessageQueue>>();
    +        for (MessageQueue mq : mqAll) {
    +            String brokerMachineRoom = 
machineRoomSelector.brokerDeployIn(mq);
    +            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    +                if (mr2Mq.get(brokerMachineRoom) == null) {
    +                    mr2Mq.put(brokerMachineRoom, new 
ArrayList<MessageQueue>());
    +                }
    +                mr2Mq.get(brokerMachineRoom).add(mq);
    +            } else {
    +                throw new IllegalArgumentException("Machine room is null 
for mq " + mq);
    +            }
    +        }
    +
    +        //group consumer by machine room
    +        Map<String, List<String>> mr2c = new TreeMap<String, 
List<String>>();
    +        for (String cid : cidAll) {
    +            String consumerMachineRoom = 
machineRoomSelector.consumerDeployIn(cid);
    +            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    +                if (mr2c.get(consumerMachineRoom) == null) {
    +                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
    +                }
    +                mr2c.get(consumerMachineRoom).add(cid);
    +            } else {
    +                throw new IllegalArgumentException("Machine room is null 
for consumer id " + cid);
    +            }
    +        }
    +
    +        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
    +
    +        //1.allocate the mq that deploy in the same machine room with the 
current consumer
    +        String currentMachineRoom = 
machineRoomSelector.consumerDeployIn(currentCID);
    +        List<MessageQueue> mqInThisMachineRoom = 
mr2Mq.remove(currentMachineRoom);
    +        List<String> consumerInThisMachineRoom = 
mr2c.get(currentMachineRoom);
    +        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) 
{
    +            
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, 
currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    +        }
    +
    +        //2.allocate the rest mq to each machine room if there are no 
consumer alive in that machine room
    +        for (String machineRoom : mr2Mq.keySet()) {
    +            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in 
the corresponding machine room, so all consumers share these queues
    +                
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, 
currentCID, mr2Mq.get(machineRoom), cidAll));
    +            }
    +        }
    +
    +        return allocateResults;
    +    }
    +
    +    @Override
    +    public String getName() {
    +        return "MACHINE_ROOM_NEARBY" + "-" + 
allocateMessageQueueStrategy.getName();
    +    }
    +
    +    public interface MachineRoomSelector {
    --- End diff --
    
    LGTM,I will rename this interface if more guys agree


> Support client to allocate message queue in machine room nearby priority
> ------------------------------------------------------------------------
>
>                 Key: ROCKETMQ-203
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-203
>             Project: Apache RocketMQ
>          Issue Type: New Feature
>            Reporter: Jaskey Lam
>            Assignee: vongosling
>         Attachments: AVLCB%1XMJ}T_YXP[OPF){T.png
>
>
> As the picture shows, in multi-machine room acrtiture , the consumer may be 
> deployed in different IDC and so do the broker. 
> But the user may possiblly need to consumer message from its own IDC since it 
> is near and faster. But when problem occurs say all consumers in one IDC is 
> crashed, the messages can also be consumed by other consumers in other IDC.
> This is a very important feature for many users, but currently, if the user 
> needs this feature, they must implement their own allocate strategy which is 
> not easy and error prone.
> And, this feature should be be conflict to the existing allocate strategy 
> that means the user can still choose their better allocate strategy say 
> AVERAGE or AVERAGE_BY_CIRCLE, but nearby machine room will be supported 
> easily too.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to