This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 96fb14256 Add unit test (#4697)
96fb14256 is described below

commit 96fb142569b3e3a5560ec57c84e351363841cf94
Author: Oliver <[email protected]>
AuthorDate: Thu Jul 28 19:44:28 2022 +0800

    Add unit test (#4697)
---
 .../broker/controller/ReplicasManagerTest.java     | 160 +++++++++++++++++++++
 1 file changed, 160 insertions(+)

diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
new file mode 100644
index 000000000..bb0c83502
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.controller;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.slave.SlaveSynchronize;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.protocol.body.SyncStateSet;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetReplicaInfoResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.RegisterBrokerToControllerResponseHeader;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.util.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplicasManagerTest {
+
+    @Mock
+    private BrokerController brokerController;
+
+    private ReplicasManager replicasManager;
+
+    @Mock
+    private DefaultMessageStore defaultMessageStore;
+
+    private SlaveSynchronize slaveSynchronize;
+
+    private AutoSwitchHAService autoSwitchHAService;
+
+    private MessageStoreConfig messageStoreConfig;
+
+    private GetMetaDataResponseHeader getMetaDataResponseHeader;
+
+    private BrokerConfig brokerConfig;
+
+    @Mock
+    private BrokerOuterAPI brokerOuterAPI;
+
+    private RegisterBrokerToControllerResponseHeader 
registerBrokerToControllerResponseHeader;
+
+    private Pair<GetReplicaInfoResponseHeader, SyncStateSet> result;
+
+    private GetReplicaInfoResponseHeader getReplicaInfoResponseHeader;
+
+    private SyncStateSet syncStateSet;
+
+    private static final String OLD_MASTER_ADDRESS = "192.168.1.1";
+
+    private static final String NEW_MASTER_ADDRESS = "192.168.1.2";
+
+    private static final long MASTER_BROKER_ID = 0;
+
+    private static final long SLAVE_BROKER_ID = 2;
+
+    private static final int OLD_MASTER_EPOCH = 2;
+    private static final int NEW_MASTER_EPOCH = 3;
+
+    private static final String GROUP = "DEFAULT_GROUP";
+
+    private static final String LEADER_ID = "leader-1";
+
+    private static final Boolean IS_LEADER = true;
+
+    private static final String PEERS = "1.1.1.1";
+
+    private static final long SCHEDULE_SERVICE_EXEC_PERIOD = 5;
+
+    private static final String SYNC_STATE = "1";
+
+    @Before
+    public void before() throws Exception {
+        autoSwitchHAService = new AutoSwitchHAService();
+        messageStoreConfig = new MessageStoreConfig();
+        brokerConfig = new BrokerConfig();
+        slaveSynchronize = new SlaveSynchronize(brokerController);
+        getMetaDataResponseHeader = new GetMetaDataResponseHeader(GROUP, 
LEADER_ID, OLD_MASTER_ADDRESS, IS_LEADER, PEERS);
+        registerBrokerToControllerResponseHeader = new 
RegisterBrokerToControllerResponseHeader();
+        
registerBrokerToControllerResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
+        getReplicaInfoResponseHeader = new GetReplicaInfoResponseHeader();
+        getReplicaInfoResponseHeader.setMasterAddress(OLD_MASTER_ADDRESS);
+        getReplicaInfoResponseHeader.setBrokerId(MASTER_BROKER_ID);
+        getReplicaInfoResponseHeader.setMasterEpoch(NEW_MASTER_EPOCH);
+        syncStateSet = new SyncStateSet(Sets.newLinkedHashSet(SYNC_STATE), 
NEW_MASTER_EPOCH);
+        result = new Pair<>(getReplicaInfoResponseHeader, syncStateSet);
+        
when(defaultMessageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        
when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+        
when(brokerController.getMessageStore().getHaService()).thenReturn(autoSwitchHAService);
+        when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+        
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        
when(brokerController.getSlaveSynchronize()).thenReturn(slaveSynchronize);
+        when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+        when(brokerController.getBrokerAddr()).thenReturn(OLD_MASTER_ADDRESS);
+        
when(brokerOuterAPI.getControllerMetaData(any())).thenReturn(getMetaDataResponseHeader);
+        when(brokerOuterAPI.registerBrokerToController(any(), any(), any(), 
any())).thenReturn(registerBrokerToControllerResponseHeader);
+        when(brokerOuterAPI.getReplicaInfo(any(), any(), 
any())).thenReturn(result);
+        replicasManager = new ReplicasManager(brokerController);
+        autoSwitchHAService.init(defaultMessageStore);
+        replicasManager.start();
+        // execute schedulingSyncBrokerMetadata()
+        TimeUnit.SECONDS.sleep(SCHEDULE_SERVICE_EXEC_PERIOD);
+    }
+
+    @After
+    public void after() {
+        replicasManager.shutdown();
+        brokerController.shutdown();
+    }
+
+    @Test
+    public void changeBrokerRoleTest(){
+        // not equal to localAddress
+        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
OLD_MASTER_EPOCH, SLAVE_BROKER_ID))
+            .doesNotThrowAnyException();
+
+        // equal to localAddress
+        Assertions.assertThatCode(() -> 
replicasManager.changeBrokerRole(OLD_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
OLD_MASTER_EPOCH , SLAVE_BROKER_ID))
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    public void changeToMasterTest() {
+        Assertions.assertThatCode(() -> 
replicasManager.changeToMaster(NEW_MASTER_EPOCH, 
OLD_MASTER_EPOCH)).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void changeToSlaveTest() {
+        Assertions.assertThatCode(() -> 
replicasManager.changeToSlave(NEW_MASTER_ADDRESS, NEW_MASTER_EPOCH, 
MASTER_BROKER_ID))
+            .doesNotThrowAnyException();
+    }
+}

Reply via email to