This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 19ef754177 [ISSUE #8421] Add more test coverage for SlaveSynchronize
(#8422)
19ef754177 is described below
commit 19ef75417751ee81f690c318895ad3c1c5143ce4
Author: Tan Xiang <[email protected]>
AuthorDate: Tue Jul 23 10:26:21 2024 +0800
[ISSUE #8421] Add more test coverage for SlaveSynchronize (#8422)
---
.../broker/slave/SlaveSynchronizeTest.java | 206 +++++++++++++++++++++
1 file changed, 206 insertions(+)
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
new file mode 100644
index 0000000000..95db733d0d
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.slave;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
+import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import
org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
+import
org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.timer.TimerMetrics;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.UnsupportedEncodingException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SlaveSynchronizeTest {
+ @Spy
+ private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
+
+ private SlaveSynchronize slaveSynchronize;
+
+ @Mock
+ private BrokerOuterAPI brokerOuterAPI;
+
+ @Mock
+ private TopicConfigManager topicConfigManager;
+
+ @Mock
+ private ConsumerOffsetManager consumerOffsetManager;
+
+ @Mock
+ private MessageStoreConfig messageStoreConfig;
+
+ @Mock
+ private MessageStore messageStore;
+
+ @Mock
+ private ScheduleMessageService scheduleMessageService;
+
+ @Mock
+ private SubscriptionGroupManager subscriptionGroupManager;
+
+ @Mock
+ private QueryAssignmentProcessor queryAssignmentProcessor;
+
+ @Mock
+ private MessageRequestModeManager messageRequestModeManager;
+
+ @Mock
+ private TimerMessageStore timerMessageStore;
+
+ @Mock
+ private TimerMetrics timerMetrics;
+
+ @Mock
+ private TimerCheckpoint timerCheckpoint;
+
+ private static final String BROKER_ADDR = "127.0.0.1:10911";
+
+ @Before
+ public void init() {
+ when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
+
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+
when(brokerController.getQueryAssignmentProcessor()).thenReturn(queryAssignmentProcessor);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+
when(brokerController.getTimerMessageStore()).thenReturn(timerMessageStore);
+
when(brokerController.getTimerCheckpoint()).thenReturn(timerCheckpoint);
+ when(topicConfigManager.getDataVersion()).thenReturn(new
DataVersion());
+ when(topicConfigManager.getTopicConfigTable()).thenReturn(new
ConcurrentHashMap<>());
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+ when(consumerOffsetManager.getOffsetTable()).thenReturn(new
ConcurrentHashMap<>());
+ when(consumerOffsetManager.getDataVersion()).thenReturn(new
DataVersion());
+ when(subscriptionGroupManager.getDataVersion()).thenReturn(new
DataVersion());
+
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new
ConcurrentHashMap<>());
+
when(queryAssignmentProcessor.getMessageRequestModeManager()).thenReturn(messageRequestModeManager);
+
when(messageRequestModeManager.getMessageRequestModeMap()).thenReturn(new
ConcurrentHashMap<>());
+ when(messageStoreConfig.isTimerWheelEnable()).thenReturn(true);
+
when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
+ when(timerMessageStore.isShouldRunningDequeue()).thenReturn(false);
+ when(timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
+ when(timerMetrics.getDataVersion()).thenReturn(new DataVersion());
+ when(timerCheckpoint.getDataVersion()).thenReturn(new DataVersion());
+ slaveSynchronize = new SlaveSynchronize(brokerController);
+ slaveSynchronize.setMasterAddr(BROKER_ADDR);
+ }
+
+ @Test
+ public void testSyncAll() throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException, UnsupportedEncodingException {
+ TopicConfig newTopicConfig = new TopicConfig("NewTopic");
+
when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(newTopicConfig));
+
when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper());
+ when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn("");
+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper());
+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper());
+
when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper());
+ slaveSynchronize.syncAll();
+ Assert.assertEquals(1,
this.brokerController.getTopicConfigManager().getDataVersion().getStateVersion());
+ Assert.assertEquals(1,
this.brokerController.getTopicQueueMappingManager().getDataVersion().getStateVersion());
+ Assert.assertEquals(1,
consumerOffsetManager.getDataVersion().getStateVersion());
+ Assert.assertEquals(1,
subscriptionGroupManager.getDataVersion().getStateVersion());
+ Assert.assertEquals(1,
timerMetrics.getDataVersion().getStateVersion());
+ }
+
+ @Test
+ public void testGetMasterAddr() {
+ Assert.assertEquals(BROKER_ADDR, slaveSynchronize.getMasterAddr());
+ }
+
+ @Test
+ public void testSyncTimerCheckPoint() throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, MQBrokerException,
InterruptedException {
+
when(brokerOuterAPI.getTimerCheckPoint(anyString())).thenReturn(timerCheckpoint);
+ slaveSynchronize.syncTimerCheckPoint();
+ Assert.assertEquals(0,
timerCheckpoint.getDataVersion().getStateVersion());
+ }
+
+ private TopicConfigAndMappingSerializeWrapper
createTopicConfigWrapper(TopicConfig topicConfig) {
+ TopicConfigAndMappingSerializeWrapper wrapper = new
TopicConfigAndMappingSerializeWrapper();
+ wrapper.setTopicConfigTable(new ConcurrentHashMap<>());
+ wrapper.getTopicConfigTable().put(topicConfig.getTopicName(),
topicConfig);
+ DataVersion dataVersion = new DataVersion();
+ dataVersion.setStateVersion(1L);
+ wrapper.setDataVersion(dataVersion);
+ wrapper.setMappingDataVersion(dataVersion);
+ return wrapper;
+ }
+
+ private ConsumerOffsetSerializeWrapper createConsumerOffsetWrapper() {
+ ConsumerOffsetSerializeWrapper wrapper = new
ConsumerOffsetSerializeWrapper();
+ wrapper.setOffsetTable(new ConcurrentHashMap<>());
+ DataVersion dataVersion = new DataVersion();
+ dataVersion.setStateVersion(1L);
+ wrapper.setDataVersion(dataVersion);
+ return wrapper;
+ }
+
+ private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
+ SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
+ wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
+ DataVersion dataVersion = new DataVersion();
+ dataVersion.setStateVersion(1L);
+ wrapper.setDataVersion(dataVersion);
+ return wrapper;
+ }
+
+ private MessageRequestModeSerializeWrapper
createMessageRequestModeWrapper() {
+ MessageRequestModeSerializeWrapper wrapper = new
MessageRequestModeSerializeWrapper();
+ wrapper.setMessageRequestModeMap(new ConcurrentHashMap<>());
+ return wrapper;
+ }
+
+ private TimerMetrics.TimerMetricsSerializeWrapper
createTimerMetricsWrapper() {
+ TimerMetrics.TimerMetricsSerializeWrapper wrapper = new
TimerMetrics.TimerMetricsSerializeWrapper();
+ wrapper.setTimingCount(new ConcurrentHashMap<>());
+ DataVersion dataVersion = new DataVersion();
+ dataVersion.setStateVersion(1L);
+ wrapper.setDataVersion(dataVersion);
+ return wrapper;
+ }
+}