vongosling closed pull request #109: [ROCKETMQ-203]Support client to allocate 
message queue in machine room nearby priority
URL: https://github.com/apache/rocketmq/pull/109
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
new file mode 100644
index 000000000..9b166e7a9
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * An allocate strategy proxy for based on machine room nearside priority. An 
actual allocate strategy can be
+ * specified.
+ *
+ * If any consumer is alive in a machine room, the message queue of the broker 
which is deployed in the same machine
+ * should only be allocated to those. Otherwise, those message queues can be 
shared along all consumers since there are
+ * no alive consumer to monopolize them.
+ */
+public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy 
{
+    private final Logger log = ClientLogger.getLog();
+
+    private final AllocateMessageQueueStrategy 
allocateMessageQueueStrategy;//actual allocate strategy
+    private final MachineRoomResolver machineRoomResolver;
+
+    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy 
allocateMessageQueueStrategy,
+        MachineRoomResolver machineRoomResolver) throws NullPointerException {
+        if (allocateMessageQueueStrategy == null) {
+            throw new NullPointerException("allocateMessageQueueStrategy is 
null");
+        }
+
+        if (machineRoomResolver == null) {
+            throw new NullPointerException("machineRoomResolver is null");
+        }
+
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+        this.machineRoomResolver = machineRoomResolver;
+    }
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String 
currentCID, List<MessageQueue> mqAll,
+        List<String> cidAll) {
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll 
empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in 
cidAll: {}",
+                consumerGroup,
+                currentCID,
+                cidAll);
+            return result;
+        }
+
+        //group mq by machine room
+        Map<String/*machine room */, List<MessageQueue>> mr2Mq = new 
TreeMap<String, List<MessageQueue>>();
+        for (MessageQueue mq : mqAll) {
+            String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
+            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
+                if (mr2Mq.get(brokerMachineRoom) == null) {
+                    mr2Mq.put(brokerMachineRoom, new 
ArrayList<MessageQueue>());
+                }
+                mr2Mq.get(brokerMachineRoom).add(mq);
+            } else {
+                throw new IllegalArgumentException("Machine room is null for 
mq " + mq);
+            }
+        }
+
+        //group consumer by machine room
+        Map<String/*machine room */, List<String/*clientId*/>> mr2c = new 
TreeMap<String, List<String>>();
+        for (String cid : cidAll) {
+            String consumerMachineRoom = 
machineRoomResolver.consumerDeployIn(cid);
+            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
+                if (mr2c.get(consumerMachineRoom) == null) {
+                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
+                }
+                mr2c.get(consumerMachineRoom).add(cid);
+            } else {
+                throw new IllegalArgumentException("Machine room is null for 
consumer id " + cid);
+            }
+        }
+
+        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
+
+        //1.allocate the mq that deploy in the same machine room with the 
current consumer
+        String currentMachineRoom = 
machineRoomResolver.consumerDeployIn(currentCID);
+        List<MessageQueue> mqInThisMachineRoom = 
mr2Mq.remove(currentMachineRoom);
+        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
+        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
+            
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, 
currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
+        }
+
+        //2.allocate the rest mq to each machine room if there are no consumer 
alive in that machine room
+        for (String machineRoom : mr2Mq.keySet()) {
+            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the 
corresponding machine room, so all consumers share these queues
+                
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, 
currentCID, mr2Mq.get(machineRoom), cidAll));
+            }
+        }
+
+        return allocateResults;
+    }
+
+    @Override
+    public String getName() {
+        return "MACHINE_ROOM_NEARBY" + "-" + 
allocateMessageQueueStrategy.getName();
+    }
+
+    /**
+     * A resolver object to determine which machine room do the message queues 
or clients are deployed in.
+     *
+     * AllocateMachineRoomNearby will use the results to group the message 
queues and clients by machine room.
+     *
+     * The result returned from the implemented method CANNOT be null.
+     */
+    public interface MachineRoomResolver {
+        String brokerDeployIn(MessageQueue messageQueue);
+
+        String consumerDeployIn(String clientID);
+    }
+}
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
new file mode 100644
index 000000000..0d394c382
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMachineRoomNearByTest {
+
+    private static final String CID_PREFIX = "CID-";
+
+    private final String topic = "topic_test";
+    private final AllocateMachineRoomNearby.MachineRoomResolver 
machineRoomResolver =  new AllocateMachineRoomNearby.MachineRoomResolver() {
+        @Override public String brokerDeployIn(MessageQueue messageQueue) {
+            return messageQueue.getBrokerName().split("-")[0];
+        }
+
+        @Override public String consumerDeployIn(String clientID) {
+            return clientID.split("-")[0];
+        }
+    };
+    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy = 
new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), 
machineRoomResolver);
+
+
+    @Before
+    public void init() {
+    }
+
+
+    @Test
+    public void test1() {
+        testWhenIDCSizeEquals(5,20,10, false);
+        testWhenIDCSizeEquals(5,20,20, false);
+        testWhenIDCSizeEquals(5,20,30, false);
+        testWhenIDCSizeEquals(5,20,0, false );
+    }
+
+    @Test
+    public void test2() {
+        testWhenConsumerIDCIsMore(5,1,10, 10, false);
+        testWhenConsumerIDCIsMore(5,1,10, 5, false);
+        testWhenConsumerIDCIsMore(5,1,10, 20, false);
+        testWhenConsumerIDCIsMore(5,1,10, 0, false);
+    }
+
+    @Test
+    public void test3() {
+        testWhenConsumerIDCIsLess(5,2,10, 10, false);
+        testWhenConsumerIDCIsLess(5,2,10, 5, false);
+        testWhenConsumerIDCIsLess(5,2,10, 20, false);
+        testWhenConsumerIDCIsLess(5,2,10, 0, false);
+    }
+
+
+    @Test
+    public void testRun10RandomCase(){
+        for(int i=0;i<10;i++){
+            int consumerSize = new Random().nextInt(200)+1;//1-200
+            int queueSize = new Random().nextInt(100)+1;//1-100
+            int brokerIDCSize = new Random().nextInt(10)+1;//1-10
+            int consumerIDCSize = new Random().nextInt(10)+1;//1-10
+
+            if (brokerIDCSize == consumerIDCSize) {
+                
testWhenIDCSizeEquals(brokerIDCSize,queueSize,consumerSize,false);
+            }
+            else if (brokerIDCSize > consumerIDCSize) {
+                testWhenConsumerIDCIsLess(brokerIDCSize,brokerIDCSize- 
consumerIDCSize, queueSize, consumerSize, false);
+            } else {
+                testWhenConsumerIDCIsMore(brokerIDCSize, consumerIDCSize - 
brokerIDCSize, queueSize, consumerSize, false);
+            }
+        }
+    }
+
+
+
+
+    public void testWhenIDCSizeEquals(int IDCSize, int queueSize, int 
consumerSize, boolean print) {
+        if (print) {
+            System.out.println("Test : IDCSize = "+ IDCSize +"queueSize = " + 
queueSize +" consumerSize = " + consumerSize);
+        }
+        List<String> cidAll = prepareConsumer(IDCSize, consumerSize);
+        List<MessageQueue> mqAll = prepareMQ(IDCSize, queueSize);
+        List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+        for (String currentID : cidAll) {
+            List<MessageQueue> res = 
allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+            if (print) {
+                System.out.println("cid: "+currentID+"--> res :" +res);
+            }
+            for (MessageQueue mq : res) {
+                
Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
+            }
+            resAll.addAll(res);
+        }
+        Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+
+        if (print) {
+            
System.out.println("-------------------------------------------------------------------");
+        }
+    }
+
+    public void testWhenConsumerIDCIsMore(int brokerIDCSize, int consumerMore, 
int queueSize, int consumerSize, boolean print) {
+        if (print) {
+            System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize 
= " + queueSize +" consumerSize = " + consumerSize);
+        }
+        Set<String> brokerIDCWithConsumer = new TreeSet<String>();
+        List<String> cidAll = prepareConsumer(brokerIDCSize +consumerMore, 
consumerSize);
+        List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
+        for (MessageQueue mq : mqAll) {
+            brokerIDCWithConsumer.add(machineRoomResolver.brokerDeployIn(mq));
+        }
+
+        List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+        for (String currentID : cidAll) {
+            List<MessageQueue> res = 
allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+            if (print) {
+                System.out.println("cid: "+currentID+"--> res :" +res);
+            }
+            for (MessageQueue mq : res) {
+                if 
(brokerIDCWithConsumer.contains(machineRoomResolver.brokerDeployIn(mq))) 
{//healthy idc, so only consumer in this idc should be allocated
+                    
Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
+                }
+            }
+            resAll.addAll(res);
+        }
+
+        Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+        if (print) {
+            
System.out.println("-------------------------------------------------------------------");
+        }
+    }
+
+    public void testWhenConsumerIDCIsLess(int brokerIDCSize, int 
consumerIDCLess, int queueSize, int consumerSize, boolean print) {
+        if (print) {
+            System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize 
= " + queueSize +" consumerSize = " + consumerSize);
+        }
+        Set<String> healthyIDC = new TreeSet<String>();
+        List<String> cidAll = prepareConsumer(brokerIDCSize - consumerIDCLess, 
consumerSize);
+        List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
+        for (String cid : cidAll) {
+            healthyIDC.add(machineRoomResolver.consumerDeployIn(cid));
+        }
+
+        List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+        Map<String, List<MessageQueue>> idc2Res = new TreeMap<String, 
List<MessageQueue>>();
+        for (String currentID : cidAll) {
+            String currentIDC = 
machineRoomResolver.consumerDeployIn(currentID);
+            List<MessageQueue> res = 
allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+            if (print) {
+                System.out.println("cid: "+currentID+"--> res :" +res);
+            }
+            if ( !idc2Res.containsKey(currentIDC)) {
+                idc2Res.put(currentIDC, new ArrayList<MessageQueue>());
+            }
+            idc2Res.get(currentIDC).addAll(res);
+            resAll.addAll(res);
+        }
+
+        for (String consumerIDC : healthyIDC) {
+            List<MessageQueue> resInOneIDC = idc2Res.get(consumerIDC);
+            List<MessageQueue> mqInThisIDC = 
createMessageQueueList(consumerIDC,queueSize);
+            Assert.assertTrue(resInOneIDC.containsAll(mqInThisIDC));
+        }
+
+        Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+        if (print) {
+            
System.out.println("-------------------------------------------------------------------");
+        }
+    }
+
+
+    private boolean hasAllocateAllQ(List<String> cidAll,List<MessageQueue> 
mqAll, List<MessageQueue> allocatedResAll) {
+        if (cidAll.isEmpty()){
+            return allocatedResAll.isEmpty();
+        }
+        return mqAll.containsAll(allocatedResAll) && 
allocatedResAll.containsAll(mqAll) && mqAll.size() == allocatedResAll.size();
+    }
+
+
+    private List<String> createConsumerIdList(String machineRoom, int size) {
+        List<String> consumerIdList = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add(machineRoom +"-"+CID_PREFIX + 
String.valueOf(i));
+        }
+        return consumerIdList;
+    }
+
+    private List<MessageQueue> createMessageQueueList(String machineRoom, int 
size) {
+        List<MessageQueue> messageQueueList = new 
ArrayList<MessageQueue>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, 
machineRoom+"-brokerName", i);
+            messageQueueList.add(mq);
+        }
+        return messageQueueList;
+    }
+
+    private List<MessageQueue> prepareMQ(int brokerIDCSize, int queueSize) {
+        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+        for (int i=1;i<=brokerIDCSize;i++) {
+            mqAll.addAll(createMessageQueueList("IDC"+i, queueSize));
+        }
+
+        return mqAll;
+    }
+
+    private List<String> prepareConsumer( int IDCSize, int consumerSize) {
+        List<String> cidAll = new ArrayList<String>();
+        for (int i=1;i<=IDCSize;i++) {
+            cidAll.addAll(createConsumerIdList("IDC"+i, consumerSize));
+        }
+        return cidAll;
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to