This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 304987d341 [ISSUE #8446] Add more test coverage for MQClientInstance 
(#8447)
304987d341 is described below

commit 304987d341f17f00e88a4ef2396f04950b4e1720
Author: yx9o <[email protected]>
AuthorDate: Fri Jul 26 10:23:55 2024 +0800

    [ISSUE #8446] Add more test coverage for MQClientInstance (#8447)
    
    * [ISSUE #8446] Add more test coverage for MQClientInstance
    
    * Update
    
    * Update
---
 .../client/impl/factory/MQClientInstanceTest.java  | 402 ++++++++++++++++++---
 1 file changed, 361 insertions(+), 41 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index acd792b862..d71bc25b9b 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -16,77 +16,116 @@
  */
 package org.apache.rocketmq.client.impl.factory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.route.BrokerData;
 import org.apache.rocketmq.remoting.protocol.route.QueueData;
 import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
-    private MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
-    private String topic = "FooBar";
-    private String group = "FooBarGroup";
-    private ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable = new 
ConcurrentHashMap<>();
 
-    @Before
-    public void init() throws Exception {
-        FieldUtils.writeDeclaredField(mqClientInstance, "brokerAddrTable", 
brokerAddrTable, true);
-    }
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
 
-    @Test
-    public void testTopicRouteData2TopicPublishInfo() {
-        TopicRouteData topicRouteData = new TopicRouteData();
+    @Mock
+    private RemotingClient remotingClient;
 
-        topicRouteData.setFilterServerTable(new HashMap<>());
-        List<BrokerData> brokerDataList = new ArrayList<>();
-        BrokerData brokerData = new BrokerData();
-        brokerData.setBrokerName("BrokerA");
-        brokerData.setCluster("DefaultCluster");
-        HashMap<Long, String> brokerAddrs = new HashMap<>();
-        brokerAddrs.put(0L, "127.0.0.1:10911");
-        brokerData.setBrokerAddrs(brokerAddrs);
-        brokerDataList.add(brokerData);
-        topicRouteData.setBrokerDatas(brokerDataList);
+    @Mock
+    private ClientConfig clientConfig;
 
-        List<QueueData> queueDataList = new ArrayList<>();
-        QueueData queueData = new QueueData();
-        queueData.setBrokerName("BrokerA");
-        queueData.setPerm(6);
-        queueData.setReadQueueNums(3);
-        queueData.setWriteQueueNums(4);
-        queueData.setTopicSysFlag(0);
-        queueDataList.add(queueData);
-        topicRouteData.setQueueDatas(queueDataList);
+    private final MQClientInstance mqClientInstance = 
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+
+    private final String topic = "FooBar";
+
+    private final String group = "FooBarGroup";
+
+    private final String defaultBrokerAddr = "127.0.0.1:10911";
+
+    private final String defaultBroker = "BrokerA";
+
+    private final ConcurrentMap<String, HashMap<Long, String>> brokerAddrTable 
= new ConcurrentHashMap<>();
 
-        TopicPublishInfo topicPublishInfo = 
MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+    private final ConcurrentMap<String, MQConsumerInner> consumerTable = new 
ConcurrentHashMap<>();
 
-        assertThat(topicPublishInfo.isHaveTopicRouterInfo()).isFalse();
-        assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4);
+    private final ConcurrentMap<String, TopicRouteData> topicRouteTable = new 
ConcurrentHashMap<>();
+
+    @Before
+    public void init() throws Exception {
+        when(mQClientAPIImpl.getRemotingClient()).thenReturn(remotingClient);
+        FieldUtils.writeDeclaredField(mqClientInstance, "brokerAddrTable", 
brokerAddrTable, true);
+        FieldUtils.writeDeclaredField(mqClientInstance, "mQClientAPIImpl", 
mQClientAPIImpl, true);
+        FieldUtils.writeDeclaredField(mqClientInstance, "consumerTable", 
consumerTable, true);
+        FieldUtils.writeDeclaredField(mqClientInstance, "clientConfig", 
clientConfig, true);
+        FieldUtils.writeDeclaredField(mqClientInstance, "topicRouteTable", 
topicRouteTable, true);
     }
 
     @Test
@@ -131,7 +170,7 @@ public class MQClientInstanceTest {
     }
 
     @Test
-    public void testRegisterConsumer() throws RemotingException, 
InterruptedException, MQBrokerException {
+    public void testRegisterConsumer() {
         boolean flag = mqClientInstance.registerConsumer(group, 
mock(MQConsumerInner.class));
         assertThat(flag).isTrue();
 
@@ -143,7 +182,6 @@ public class MQClientInstanceTest {
         assertThat(flag).isTrue();
     }
 
-
     @Test
     public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws 
RemotingException, InterruptedException, MQBrokerException {
         MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class);
@@ -181,4 +219,286 @@ public class MQClientInstanceTest {
         assertThat(flag).isTrue();
     }
 
+    @Test
+    public void testTopicRouteData2TopicPublishInfo() {
+        TopicPublishInfo actual = 
MQClientInstance.topicRouteData2TopicPublishInfo(topic, createTopicRouteData());
+        assertThat(actual.isHaveTopicRouterInfo()).isFalse();
+        assertThat(actual.getMessageQueueList().size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testTopicRouteData2TopicPublishInfoWithOrderTopicConf() {
+        TopicRouteData topicRouteData = createTopicRouteData();
+        when(topicRouteData.getOrderTopicConf()).thenReturn("127.0.0.1:4");
+        TopicPublishInfo actual = 
MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+        assertFalse(actual.isHaveTopicRouterInfo());
+        assertEquals(4, actual.getMessageQueueList().size());
+    }
+
+    @Test
+    public void 
testTopicRouteData2TopicPublishInfoWithTopicQueueMappingByBroker() {
+        TopicRouteData topicRouteData = createTopicRouteData();
+        
when(topicRouteData.getTopicQueueMappingByBroker()).thenReturn(Collections.singletonMap(topic,
 new TopicQueueMappingInfo()));
+        TopicPublishInfo actual = 
MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+        assertFalse(actual.isHaveTopicRouterInfo());
+        assertEquals(0, actual.getMessageQueueList().size());
+    }
+
+    @Test
+    public void testTopicRouteData2TopicSubscribeInfo() {
+        TopicRouteData topicRouteData = createTopicRouteData();
+        
when(topicRouteData.getTopicQueueMappingByBroker()).thenReturn(Collections.singletonMap(topic,
 new TopicQueueMappingInfo()));
+        Set<MessageQueue> actual = 
MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void testParseOffsetTableFromBroker() {
+        Map<MessageQueue, Long> offsetTable = new HashMap<>();
+        offsetTable.put(new MessageQueue(), 0L);
+        Map<MessageQueue, Long> actual = 
mqClientInstance.parseOffsetTableFromBroker(offsetTable, "defaultNamespace");
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+    }
+
+    @Test
+    public void testCheckClientInBroker() throws MQClientException, 
RemotingSendRequestException, RemotingConnectException, 
RemotingTimeoutException, InterruptedException {
+        doThrow(new MQClientException("checkClientInBroker exception", 
null)).when(mQClientAPIImpl).checkClientInBroker(
+                any(),
+                any(),
+                any(),
+                any(SubscriptionData.class),
+                anyLong());
+        topicRouteTable.put(topic, createTopicRouteData());
+        MQConsumerInner mqConsumerInner = createMQConsumerInner();
+        mqConsumerInner.subscriptions().clear();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(topic);
+        subscriptionData.setExpressionType("type");
+        mqConsumerInner.subscriptions().add(subscriptionData);
+        consumerTable.put(group, mqConsumerInner);
+        Throwable thrown = assertThrows(MQClientException.class, 
mqClientInstance::checkClientInBroker);
+        assertTrue(thrown.getMessage().contains("checkClientInBroker 
exception"));
+    }
+
+    @Test
+    public void testSendHeartbeatToBrokerV1() {
+        consumerTable.put(group, createMQConsumerInner());
+        assertTrue(mqClientInstance.sendHeartbeatToBroker(0L, defaultBroker, 
defaultBrokerAddr));
+    }
+
+    @Test
+    public void testSendHeartbeatToBrokerV2() throws MQBrokerException, 
RemotingException, InterruptedException {
+        consumerTable.put(group, createMQConsumerInner());
+        when(clientConfig.isUseHeartbeatV2()).thenReturn(true);
+        HeartbeatV2Result heartbeatV2Result = mock(HeartbeatV2Result.class);
+        when(heartbeatV2Result.isSupportV2()).thenReturn(true);
+        when(mQClientAPIImpl.sendHeartbeatV2(any(), any(HeartbeatData.class), 
anyLong())).thenReturn(heartbeatV2Result);
+        assertTrue(mqClientInstance.sendHeartbeatToBroker(0L, defaultBroker, 
defaultBrokerAddr));
+    }
+
+    @Test
+    public void testSendHeartbeatToAllBrokerWithLockV1() {
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        assertTrue(mqClientInstance.sendHeartbeatToAllBrokerWithLock());
+    }
+
+    @Test
+    public void testSendHeartbeatToAllBrokerWithLockV2() {
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        when(clientConfig.isUseHeartbeatV2()).thenReturn(true);
+        assertTrue(mqClientInstance.sendHeartbeatToAllBrokerWithLock());
+    }
+
+    @Test
+    public void testUpdateTopicRouteInfoFromNameServer() throws 
RemotingException, InterruptedException, MQClientException {
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        DefaultMQProducer defaultMQProducer = mock(DefaultMQProducer.class);
+        TopicRouteData topicRouteData = createTopicRouteData();
+        
when(mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(anyLong())).thenReturn(topicRouteData);
+        assertFalse(mqClientInstance.updateTopicRouteInfoFromNameServer(topic, 
true, defaultMQProducer));
+    }
+
+    @Test
+    public void testFindBrokerAddressInAdmin() {
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        FindBrokerResult actual = 
mqClientInstance.findBrokerAddressInAdmin(defaultBroker);
+        assertNotNull(actual);
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+    }
+
+    @Test
+    public void testFindBrokerAddressInSubscribeWithOneBroker() throws 
IllegalAccessException {
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        ConcurrentMap<String, HashMap<String, Integer>> brokerVersionTable = 
new ConcurrentHashMap<>();
+        HashMap<String, Integer> addressMap = new HashMap<>();
+        addressMap.put(defaultBrokerAddr, 0);
+        brokerVersionTable.put(defaultBroker, addressMap);
+        FieldUtils.writeDeclaredField(mqClientInstance, "brokerVersionTable", 
brokerVersionTable, true);
+        FindBrokerResult actual = 
mqClientInstance.findBrokerAddressInSubscribe(defaultBroker, 1L, false);
+        assertNotNull(actual);
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+    }
+
+    @Test
+    public void testFindConsumerIdList() {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        List<String> actual = mqClientInstance.findConsumerIdList(topic, 
group);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void testQueryAssignment() throws MQBrokerException, 
RemotingException, InterruptedException {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        Set<MessageQueueAssignment> actual = 
mqClientInstance.queryAssignment(topic, group, "", MessageModel.CLUSTERING, 
1000);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void testResetOffset() throws IllegalAccessException {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        Map<MessageQueue, Long> offsetTable = new HashMap<>();
+        offsetTable.put(createMessageQueue(), 0L);
+        mqClientInstance.resetOffset(topic, group, offsetTable);
+        Field consumerTableField = 
FieldUtils.getDeclaredField(mqClientInstance.getClass(), "consumerTable", true);
+        ConcurrentMap<String, MQConsumerInner> consumerTable = 
(ConcurrentMap<String, MQConsumerInner>) 
consumerTableField.get(mqClientInstance);
+        DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) 
consumerTable.get(group);
+        verify(consumer).suspend();
+        verify(consumer).resume();
+        verify(consumer, times(1))
+                .updateConsumeOffset(
+                        any(MessageQueue.class),
+                        eq(0L));
+    }
+
+    @Test
+    public void testGetConsumerStatus() {
+        topicRouteTable.put(topic, createTopicRouteData());
+        brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
+        consumerTable.put(group, createMQConsumerInner());
+        Map<MessageQueue, Long> actual = 
mqClientInstance.getConsumerStatus(topic, group);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void testGetAnExistTopicRouteData() {
+        topicRouteTable.put(topic, createTopicRouteData());
+        TopicRouteData actual = 
mqClientInstance.getAnExistTopicRouteData(topic);
+        assertNotNull(actual);
+        assertNotNull(actual.getQueueDatas());
+        assertNotNull(actual.getBrokerDatas());
+    }
+
+    @Test
+    public void testConsumeMessageDirectly() {
+        consumerTable.put(group, createMQConsumerInner());
+        assertNull(mqClientInstance.consumeMessageDirectly(createMessageExt(), 
group, defaultBroker));
+    }
+
+    @Test
+    public void testQueryTopicRouteData() {
+        consumerTable.put(group, createMQConsumerInner());
+        topicRouteTable.put(topic, createTopicRouteData());
+        TopicRouteData actual = mqClientInstance.queryTopicRouteData(topic);
+        assertNotNull(actual);
+        assertNotNull(actual.getQueueDatas());
+        assertNotNull(actual.getBrokerDatas());
+    }
+
+    private MessageExt createMessageExt() {
+        MessageExt result = new MessageExt();
+        result.setBody("body".getBytes(StandardCharsets.UTF_8));
+        result.setTopic(topic);
+        result.setBrokerName(defaultBroker);
+        result.putUserProperty("key", "value");
+        result.getProperties().put(MessageConst.PROPERTY_PRODUCER_GROUP, 
group);
+        
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, 
"TX1");
+        long curTime = System.currentTimeMillis();
+        result.setBornTimestamp(curTime - 1000);
+        result.getProperties().put(MessageConst.PROPERTY_POP_CK, curTime + " " 
+ curTime + " " + curTime + " " + curTime);
+        result.setKeys("keys");
+        result.setSysFlag(MessageSysFlag.INNER_BATCH_FLAG);
+        result.setSysFlag(result.getSysFlag() | 
MessageSysFlag.NEED_UNWRAP_FLAG);
+        SocketAddress bornHost = new InetSocketAddress("127.0.0.1", 12911);
+        SocketAddress storeHost = new InetSocketAddress("127.0.0.1", 10911);
+        result.setBornHost(bornHost);
+        result.setStoreHost(storeHost);
+        return result;
+    }
+
+    private MessageQueue createMessageQueue() {
+        MessageQueue result = new MessageQueue();
+        result.setQueueId(0);
+        result.setBrokerName(defaultBroker);
+        result.setTopic(topic);
+        return result;
+    }
+
+    private TopicRouteData createTopicRouteData() {
+        TopicRouteData result = mock(TopicRouteData.class);
+        when(result.getBrokerDatas()).thenReturn(createBrokerDatas());
+        when(result.getQueueDatas()).thenReturn(createQueueDatas());
+        return result;
+    }
+
+    private HashMap<Long, String> createBrokerAddrMap() {
+        HashMap<Long, String> result = new HashMap<>();
+        result.put(0L, defaultBrokerAddr);
+        return result;
+    }
+
+    private MQConsumerInner createMQConsumerInner() {
+        DefaultMQPushConsumerImpl result = 
mock(DefaultMQPushConsumerImpl.class);
+        Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
+        SubscriptionData subscriptionData = mock(SubscriptionData.class);
+        subscriptionDataSet.add(subscriptionData);
+        when(result.subscriptions()).thenReturn(subscriptionDataSet);
+        RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
+        ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new 
ConcurrentHashMap<>();
+        ProcessQueue processQueue = new ProcessQueue();
+        processQueueMap.put(createMessageQueue(), processQueue);
+        when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueMap);
+        when(result.getRebalanceImpl()).thenReturn(rebalanceImpl);
+        OffsetStore offsetStore = mock(OffsetStore.class);
+        when(result.getOffsetStore()).thenReturn(offsetStore);
+        ConsumeMessageService consumeMessageService = 
mock(ConsumeMessageService.class);
+        
when(result.getConsumeMessageService()).thenReturn(consumeMessageService);
+        return result;
+    }
+
+    private List<QueueData> createQueueDatas() {
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName(defaultBroker);
+        queueData.setPerm(6);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSysFlag(0);
+        return Collections.singletonList(queueData);
+    }
+
+    private List<BrokerData> createBrokerDatas() {
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName(defaultBroker);
+        String defaultCluster = "defaultCluster";
+        brokerData.setCluster(defaultCluster);
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(MixAll.MASTER_ID, defaultBrokerAddr);
+        brokerData.setBrokerAddrs(brokerAddrs);
+        return Collections.singletonList(brokerData);
+    }
 }

Reply via email to