This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 13765b040 [ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
13765b040 is described below
commit 13765b040a69f81bde5d66e98c1104efde86819a
Author: zhangjidi2016 <[email protected]>
AuthorDate: Wed Apr 6 22:28:49 2022 +0800
[ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
* [ISSUE #3940]Optimize AllocateMessageQueueStrategy
* remove the check in AllocateMessageQueueByConfig
Co-authored-by: zhangjidi <[email protected]>
---
...a => AbstractAllocateMessageQueueStrategy.java} | 37 ++++------
.../rebalance/AllocateMachineRoomNearby.java | 20 +-----
.../rebalance/AllocateMessageQueueAveragely.java | 21 +-----
.../AllocateMessageQueueAveragelyByCircle.java | 21 +-----
.../rebalance/AllocateMessageQueueByConfig.java | 3 +-
.../AllocateMessageQueueByMachineRoom.java | 18 ++---
.../AllocateMessageQueueConsistentHash.java | 22 +-----
.../AllocateMessageQueueByConfigTest.java | 65 +++++++++++++++++
.../AllocateMessageQueueByMachineRoomTest.java | 82 ++++++++++++++++++++++
9 files changed, 173 insertions(+), 116 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
similarity index 66%
copy from
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
copy to
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
index fe78f0a6b..22ba5060c 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
@@ -14,54 +14,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.client.consumer.rebalance;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+
+public abstract class AbstractAllocateMessageQueueStrategy implements
AllocateMessageQueueStrategy {
-/**
- * Cycle average Hashing queue algorithm
- */
-public class AllocateMessageQueueAveragelyByCircle implements
AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
+ public boolean check(String consumerGroup, String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
+ if (StringUtils.isEmpty(currentCID)) {
throw new IllegalArgumentException("currentCID is empty");
}
- if (mqAll == null || mqAll.isEmpty()) {
+ if (CollectionUtils.isEmpty(mqAll)) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
- if (cidAll == null || cidAll.isEmpty()) {
+ if (CollectionUtils.isEmpty(cidAll)) {
throw new IllegalArgumentException("cidAll is null or cidAll
empty");
}
- List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in
cidAll: {}",
consumerGroup,
currentCID,
cidAll);
- return result;
- }
-
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqAll.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqAll.get(i));
- }
+ return false;
}
- return result;
- }
- @Override
- public String getName() {
- return "AVG_BY_CIRCLE";
+ return true;
}
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
index 8e9267459..415a5fa51 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
@@ -23,9 +23,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.InternalLogger;
/**
* An allocate strategy proxy for based on machine room nearside priority. An
actual allocate strategy can be
@@ -35,8 +33,7 @@ import org.apache.rocketmq.logging.InternalLogger;
* should only be allocated to those. Otherwise, those message queues can be
shared along all consumers since there are
* no alive consumer to monopolize them.
*/
-public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy
{
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMachineRoomNearby extends
AbstractAllocateMessageQueueStrategy {
private final AllocateMessageQueueStrategy
allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;
@@ -58,22 +55,9 @@ public class AllocateMachineRoomNearby implements
AllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll
empty");
- }
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in
cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
index 155e692ad..895f27757 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
@@ -18,36 +18,19 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Average Hashing queue algorithm
*/
-public class AllocateMessageQueueAveragely implements
AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueAveragely extends
AbstractAllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll
empty");
- }
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in
cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
index fe78f0a6b..d23350074 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -18,36 +18,19 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Cycle average Hashing queue algorithm
*/
-public class AllocateMessageQueueAveragelyByCircle implements
AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueAveragelyByCircle extends
AbstractAllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll
empty");
- }
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in
cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
index e548803d0..5866e95dd 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
@@ -17,10 +17,9 @@
package org.apache.rocketmq.client.consumer.rebalance;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
-public class AllocateMessageQueueByConfig implements
AllocateMessageQueueStrategy {
+public class AllocateMessageQueueByConfig extends
AbstractAllocateMessageQueueStrategy {
private List<MessageQueue> messageQueueList;
@Override
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
index 42a0be1dd..289242f8d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -19,30 +19,22 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Computer room Hashing queue algorithm, such as Alipay logic room
*/
-public class AllocateMessageQueueByMachineRoom implements
AllocateMessageQueueStrategy {
+public class AllocateMessageQueueByMachineRoom extends
AbstractAllocateMessageQueueStrategy {
private Set<String> consumeridcs;
@Override
public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (StringUtils.isBlank(currentCID)) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (CollectionUtils.isEmpty(mqAll)) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (CollectionUtils.isEmpty(cidAll)) {
- throw new IllegalArgumentException("cidAll is null or cidAll
empty");
- }
+
List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
+ return result;
+ }
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
index 65dcf7992..7dededa76 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -19,19 +19,15 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Consistent Hashing queue algorithm
*/
-public class AllocateMessageQueueConsistentHash implements
AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueConsistentHash extends
AbstractAllocateMessageQueueStrategy {
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
@@ -56,22 +52,8 @@ public class AllocateMessageQueueConsistentHash implements
AllocateMessageQueueS
public List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll
empty");
- }
-
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in
cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
new file mode 100644
index 000000000..a1c925c3a
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+
+public class AllocateMessageQueueByConfigTest extends TestCase {
+
+ public void testAllocateMessageQueueByConfig() {
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(4);
+ AllocateMessageQueueByConfig allocateStrategy = new
AllocateMessageQueueByConfig();
+ allocateStrategy.setMessageQueueList(messageQueueList);
+
+ Map<String, int[]> consumerAllocateQueue = new HashMap<String,
int[]>(consumerIdList.size());
+ for (String consumerId : consumerIdList) {
+ List<MessageQueue> queues = allocateStrategy.allocate("",
consumerId, messageQueueList, consumerIdList);
+ int[] queueIds = new int[queues.size()];
+ for (int i = 0; i < queues.size(); i++) {
+ queueIds[i] = queues.get(i).getQueueId();
+ }
+ consumerAllocateQueue.put(consumerId, queueIds);
+ }
+ Assert.assertArrayEquals(new int[] {0, 1, 2, 3},
consumerAllocateQueue.get("CID_PREFIX0"));
+ Assert.assertArrayEquals(new int[] {0, 1, 2, 3},
consumerAllocateQueue.get("CID_PREFIX1"));
+ }
+
+ private List<String> createConsumerIdList(int size) {
+ List<String> consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add("CID_PREFIX" + i);
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(int size) {
+ List<MessageQueue> messageQueueList = new
ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue("topic", "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+}
+
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
new file mode 100644
index 000000000..7b6dc6d7d
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import junit.framework.TestCase;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+
+public class AllocateMessageQueueByMachineRoomTest extends TestCase {
+
+ public void testAllocateMessageQueueByMachineRoom() {
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(10);
+ Set<String> consumeridcs = new HashSet<String>();
+ consumeridcs.add("room1");
+ AllocateMessageQueueByMachineRoom allocateStrategy = new
AllocateMessageQueueByMachineRoom();
+ allocateStrategy.setConsumeridcs(consumeridcs);
+
+ // mqAll is null or mqAll empty
+ try {
+ allocateStrategy.allocate("", consumerIdList.get(0), new
ArrayList<MessageQueue>(), consumerIdList);
+ } catch (Exception e) {
+ assert e instanceof IllegalArgumentException;
+ Assert.assertEquals("mqAll is null or mqAll empty",
e.getMessage());
+ }
+
+ Map<String, int[]> consumerAllocateQueue = new HashMap<String,
int[]>(consumerIdList.size());
+ for (String consumerId : consumerIdList) {
+ List<MessageQueue> queues = allocateStrategy.allocate("",
consumerId, messageQueueList, consumerIdList);
+ int[] queueIds = new int[queues.size()];
+ for (int i = 0; i < queues.size(); i++) {
+ queueIds[i] = queues.get(i).getQueueId();
+ }
+ consumerAllocateQueue.put(consumerId, queueIds);
+ }
+ Assert.assertArrayEquals(new int[] {0, 1, 4},
consumerAllocateQueue.get("CID_PREFIX0"));
+ Assert.assertArrayEquals(new int[] {2, 3},
consumerAllocateQueue.get("CID_PREFIX1"));
+ }
+
+ private List<String> createConsumerIdList(int size) {
+ List<String> consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add("CID_PREFIX" + i);
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(int size) {
+ List<MessageQueue> messageQueueList = new
ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq;
+ if (i < size / 2) {
+ mq = new MessageQueue("topic", "room1@broker-a", i);
+ } else {
+ mq = new MessageQueue("topic", "room2@broker-b", i);
+ }
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+}
+