This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d855ebc96f [ISSUE #8640] Add more test coverage for Broker2Client
(#8641)
d855ebc96f is described below
commit d855ebc96f1dff85aa26bfc9253dca413eba586d
Author: yx9o <[email protected]>
AuthorDate: Fri Sep 6 09:38:51 2024 +0800
[ISSUE #8640] Add more test coverage for Broker2Client (#8641)
* [ISSUE #8640] Add more test coverage for Broker2Client
* Update
---
.../broker/client/net/Broker2ClientTest.java | 208 +++++++++++++++++++++
1 file changed, 208 insertions(+)
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/client/net/Broker2ClientTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/client/net/Broker2ClientTest.java
new file mode 100644
index 0000000000..865e7b608e
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/client/net/Broker2ClientTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client.net;
+
+import io.netty.channel.Channel;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
+import
org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class Broker2ClientTest {
+
+ @Mock
+ private BrokerController brokerController;
+
+ @Mock
+ private RemotingServer remotingServer;
+
+ @Mock
+ private ConsumerManager consumerManager;
+
+ @Mock
+ private TopicConfigManager topicConfigManager;
+
+ @Mock
+ private ConsumerOffsetManager consumerOffsetManager;
+
+ @Mock
+ private Channel channel;
+
+ @Mock
+ private ConsumerGroupInfo consumerGroupInfo;
+
+ private Broker2Client broker2Client;
+
+ private final String defaultTopic = "defaultTopic";
+
+ private final String defaultBroker = "defaultBroker";
+
+ private final String defaultGroup = "defaultGroup";
+
+ private final long timestamp = System.currentTimeMillis();
+
+ private final boolean isForce = true;
+
+ @Before
+ public void init() {
+ broker2Client = new Broker2Client(brokerController);
+ when(brokerController.getRemotingServer()).thenReturn(remotingServer);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
when(brokerController.getBrokerConfig()).thenReturn(mock(BrokerConfig.class));
+
when(brokerController.getMessageStore()).thenReturn(mock(MessageStore.class));
+
when(consumerManager.getConsumerGroupInfo(any())).thenReturn(consumerGroupInfo);
+ }
+
+ @Test
+ public void testCheckProducerTransactionState() throws Exception {
+ CheckTransactionStateRequestHeader requestHeader = new
CheckTransactionStateRequestHeader();
+ broker2Client.checkProducerTransactionState("group", channel,
requestHeader, createMessageExt());
+ verify(remotingServer).invokeOneway(eq(channel),
any(RemotingCommand.class), eq(10L));
+ }
+
+ @Test
+ public void testCheckProducerTransactionStateException() throws Exception {
+ CheckTransactionStateRequestHeader requestHeader = new
CheckTransactionStateRequestHeader();
+ MessageExt messageExt = createMessageExt();
+ doThrow(new RuntimeException("Test Exception"))
+ .when(remotingServer)
+ .invokeOneway(any(Channel.class),
+ any(RemotingCommand.class),
+ anyLong());
+ broker2Client.checkProducerTransactionState("group", channel,
requestHeader, messageExt);
+ verify(brokerController.getRemotingServer()).invokeOneway(eq(channel),
any(RemotingCommand.class), eq(10L));
+ }
+
+ @Test
+ public void testResetOffsetNoTopicConfig() {
+
when(topicConfigManager.selectTopicConfig(defaultTopic)).thenReturn(null);
+ RemotingCommand response = broker2Client.resetOffset(defaultTopic,
defaultGroup, timestamp, isForce);
+ assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode());
+ }
+
+ @Test
+ public void testResetOffsetNoConsumerGroupInfo() {
+ TopicConfig topicConfig = mock(TopicConfig.class);
+
when(topicConfigManager.selectTopicConfig(defaultTopic)).thenReturn(topicConfig);
+ when(topicConfig.getWriteQueueNums()).thenReturn(1);
+ when(consumerOffsetManager.queryOffset(defaultGroup, defaultTopic,
0)).thenReturn(0L);
+ RemotingCommand response = broker2Client.resetOffset(defaultTopic,
defaultGroup, timestamp, isForce);
+ assertEquals(ResponseCode.CONSUMER_NOT_ONLINE, response.getCode());
+ }
+
+ @Test
+ public void testResetOffset() {
+ TopicConfig topicConfig = mock(TopicConfig.class);
+
when(topicConfigManager.selectTopicConfig(defaultTopic)).thenReturn(topicConfig);
+ when(topicConfig.getWriteQueueNums()).thenReturn(1);
+
when(brokerController.getConsumerOffsetManager().queryOffset(defaultGroup,
defaultTopic, 0)).thenReturn(0L);
+ BrokerConfig brokerConfig = mock(BrokerConfig.class);
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+ when(brokerConfig.getBrokerName()).thenReturn(defaultBroker);
+ ConsumerGroupInfo consumerGroupInfo = mock(ConsumerGroupInfo.class);
+
when(consumerManager.getConsumerGroupInfo(defaultGroup)).thenReturn(consumerGroupInfo);
+ RemotingCommand response = broker2Client.resetOffset(defaultTopic,
defaultGroup, timestamp, isForce);
+ assertEquals(ResponseCode.CONSUMER_NOT_ONLINE, response.getCode());
+ }
+
+ @Test
+ public void testGetConsumeStatusNoConsumerOnline() {
+ when(consumerGroupInfo.getChannelInfoTable()).thenReturn(new
ConcurrentHashMap<>());
+ RemotingCommand response =
broker2Client.getConsumeStatus(defaultTopic, defaultGroup, "");
+ assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode());
+ }
+
+ @Test
+ public void testGetConsumeStatusClientDoesNotSupportFeature() {
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel,
"defaultClientId", null, MQVersion.Version.V3_0_6.ordinal());
+ ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new
ConcurrentHashMap<>();
+ channelInfoTable.put(channel, clientChannelInfo);
+
when(consumerGroupInfo.getChannelInfoTable()).thenReturn(channelInfoTable);
+ RemotingCommand response =
broker2Client.getConsumeStatus(defaultTopic, defaultGroup, "");
+ assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode());
+ }
+
+ @Test
+ public void testGetConsumeStatus() throws Exception {
+ ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new
ConcurrentHashMap<>();
+ ClientChannelInfo clientChannelInfo = mock(ClientChannelInfo.class);
+
when(clientChannelInfo.getVersion()).thenReturn(MQVersion.CURRENT_VERSION);
+ channelInfoTable.put(channel, clientChannelInfo);
+
when(consumerGroupInfo.getChannelInfoTable()).thenReturn(channelInfoTable);
+ RemotingCommand responseMock = mock(RemotingCommand.class);
+ when(responseMock.getCode()).thenReturn(ResponseCode.SUCCESS);
+
when(responseMock.getBody()).thenReturn("{\"consumerTable\":{}}".getBytes(StandardCharsets.UTF_8));
+ when(remotingServer.invokeSync(any(Channel.class),
any(RemotingCommand.class), anyLong())).thenReturn(responseMock);
+ RemotingCommand response =
broker2Client.getConsumeStatus(defaultTopic, defaultGroup, "");
+ assertEquals(ResponseCode.SUCCESS, response.getCode());
+ GetConsumerStatusBody body =
RemotingSerializable.decode(response.getBody(), GetConsumerStatusBody.class);
+ assertEquals(1, body.getConsumerTable().size());
+ }
+
+ private MessageExt createMessageExt() {
+ MessageExt result = new MessageExt();
+ result.setBody("body".getBytes(StandardCharsets.UTF_8));
+ result.setTopic(defaultTopic);
+ result.setBrokerName(defaultBroker);
+ result.putUserProperty("key", "value");
+ result.getProperties().put(MessageConst.PROPERTY_PRODUCER_GROUP,
defaultGroup);
+
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
"TX1");
+ result.setKeys("keys");
+ SocketAddress bornHost = new InetSocketAddress("127.0.0.1", 12911);
+ SocketAddress storeHost = new InetSocketAddress("127.0.0.1", 10911);
+ result.setStoreHost(storeHost);
+ result.setBornHost(bornHost);
+ return result;
+ }
+}