This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 87fb1f5f7b [ISSUE #8495] Add more test coverage for
PeekMessageProcessor (#8498)
87fb1f5f7b is described below
commit 87fb1f5f7b702fc7c0a2f7f0906c18c3838cedc9
Author: Tan Xiang <[email protected]>
AuthorDate: Tue Aug 6 14:04:10 2024 +0800
[ISSUE #8495] Add more test coverage for PeekMessageProcessor (#8498)
* [ISSUE #8495]add more test coverage for PeekMessageProcessor
* [ISSUE #8495]add more test coverage for PeekMessageProcessor
---
.../broker/processor/PeekMessageProcessorTest.java | 170 +++++++++++++++++++++
1 file changed, 170 insertions(+)
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
new file mode 100644
index 0000000000..7f8504453c
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PeekMessageProcessorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.header.PeekMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PeekMessageProcessorTest {
+
+ private PeekMessageProcessor peekMessageProcessor;
+
+ @Spy
+ private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
+
+ @Mock
+ private ChannelHandlerContext handlerContext;
+
+ @Mock
+ private MessageStore messageStore;
+
+ @Mock
+ private SubscriptionGroupManager subscriptionGroupManager;
+
+ @Mock
+ private ConsumerOffsetManager consumerOffsetManager;
+
+ @Mock
+ private SubscriptionGroupConfig subscriptionGroupConfig;
+
+ @Mock
+ private Channel channel;
+
+ private TopicConfigManager topicConfigManager;
+
+ @Before
+ public void init() {
+ peekMessageProcessor = new PeekMessageProcessor(brokerController);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ topicConfigManager = new TopicConfigManager(brokerController);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+
when(subscriptionGroupManager.findSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupConfig);
+ when(subscriptionGroupConfig.isConsumeEnable()).thenReturn(true);
+ topicConfigManager.getTopicConfigTable().put("topic", new
TopicConfig("topic"));
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+ when(consumerOffsetManager.queryOffset(anyString(), anyString(),
anyInt())).thenReturn(-1L);
+
when(messageStore.getMinOffsetInQueue(anyString(),anyInt())).thenReturn(0L);
+ when(handlerContext.channel()).thenReturn(channel);
+ when(channel.remoteAddress()).thenReturn(new
InetSocketAddress("127.0.0.1", 12345));
+ }
+
+ @Test
+ public void testProcessRequest() throws RemotingCommandException {
+ RemotingCommand request = createPeekMessageRequest("group","topic",0);
+ GetMessageResult getMessageResult = new GetMessageResult();
+ getMessageResult.setStatus(GetMessageStatus.FOUND);
+ ByteBuffer bb = ByteBuffer.allocate(64);
+ bb.putLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION,
System.currentTimeMillis());
+ SelectMappedBufferResult mappedBufferResult1 = new
SelectMappedBufferResult(0, bb, 64, null);
+ for (int i = 0; i < 10;i++) {
+ getMessageResult.addMessage(mappedBufferResult1);
+ }
+
when(messageStore.getMessage(anyString(),anyString(),anyInt(),anyLong(),anyInt(),any())).thenReturn(getMessageResult);
+ RemotingCommand response =
peekMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testProcessRequest_NoPermission() throws
RemotingCommandException {
+
this.brokerController.getBrokerConfig().setBrokerPermission(PermName.PERM_WRITE);
+ RemotingCommand request = createPeekMessageRequest("group","topic",0);
+ RemotingCommand response =
peekMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+
this.brokerController.getBrokerConfig().setBrokerPermission(PermName.PERM_WRITE
| PermName.PERM_READ);
+
+
topicConfigManager.getTopicConfigTable().get("topic").setPerm(PermName.PERM_WRITE);
+ response = peekMessageProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+
topicConfigManager.getTopicConfigTable().get("topic").setPerm(PermName.PERM_WRITE
| PermName.PERM_READ);
+
+ when(subscriptionGroupConfig.isConsumeEnable()).thenReturn(false);
+ response = peekMessageProcessor.processRequest(handlerContext,
request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
+ }
+
+ @Test
+ public void testProcessRequest_TopicNotExist() throws
RemotingCommandException {
+ RemotingCommand request =
createPeekMessageRequest("group1","topic1",0);
+ RemotingCommand response =
peekMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+ }
+
+ @Test
+ public void testProcessRequest_SubscriptionGroupNotExist() throws
RemotingCommandException {
+
when(subscriptionGroupManager.findSubscriptionGroupConfig(anyString())).thenReturn(null);
+ RemotingCommand request = createPeekMessageRequest("group","topic",0);
+ RemotingCommand response =
peekMessageProcessor.processRequest(handlerContext, request);
+
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ }
+
+ @Test
+ public void testProcessRequest_QueueIdError() throws
RemotingCommandException {
+ RemotingCommand request = createPeekMessageRequest("group","topic",17);
+ RemotingCommand response =
peekMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ }
+
+ private RemotingCommand createPeekMessageRequest(String group,String
topic,int queueId) {
+ PeekMessageRequestHeader peekMessageRequestHeader = new
PeekMessageRequestHeader();
+ peekMessageRequestHeader.setConsumerGroup(group);
+ peekMessageRequestHeader.setTopic(topic);
+ peekMessageRequestHeader.setMaxMsgNums(10);
+ peekMessageRequestHeader.setQueueId(queueId);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PEEK_MESSAGE,
peekMessageRequestHeader);
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+}