This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit b887313bfbcb2d90c6db9757299fd76deb9f103d Author: zhouxiang <[email protected]> AuthorDate: Tue Nov 1 15:14:23 2022 +0800 [ISSUE #5406] Add unit test --- .../activity/AbstractRemotingActivity.java | 1 + .../remoting/activity/GetTopicRouteActivity.java | 1 + .../remoting/activity/PullMessageActivity.java | 15 +- .../activity/AbstractRemotingActivityTest.java | 130 +++++++++++++++- .../remoting/activity/PullMessageActivityTest.java | 165 +++++++++++++++++++++ .../remoting/activity/SendMessageActivityTest.java | 102 +++++++++++++ 6 files changed, 394 insertions(+), 20 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java index 7f0d891ec..54ef7bfa7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivity.java @@ -67,6 +67,7 @@ public abstract class AbstractRemotingActivity implements NettyRequestProcessor } String brokerName = request.getExtFields().get(BROKER_NAME_FIELD); if (request.isOnewayRPC()) { + messagingProcessor.requestOneway(context, brokerName, request, timeoutMillis); return null; } messagingProcessor.request(context, brokerName, request, timeoutMillis) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java index d3b7de98d..26d28bafe 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/GetTopicRouteActivity.java @@ -49,6 +49,7 @@ public class GetTopicRouteActivity extends AbstractRemotingActivity { final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); List<Address> addressList = new ArrayList<>(); + // AddressScheme is just a placeholder and will not affect topic route result in this case. addressList.add(new Address(Address.AddressScheme.IPv4, HostAndPort.fromString(proxyConfig.getRemotingAccessPoint()))); ProxyTopicRouteData proxyTopicRouteData = messagingProcessor.getTopicRouteDataForProxy(context, addressList, requestHeader.getTopic()); TopicRouteData topicRouteData = proxyTopicRouteData.buildTopicRouteData(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java index 819bf139d..873b52460 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivity.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.proxy.remoting.activity; import io.netty.channel.ChannelHandlerContext; import java.time.Duration; -import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; @@ -39,10 +38,6 @@ public class PullMessageActivity extends AbstractRemotingActivity { @Override protected RemotingCommand processRequest0(ChannelHandlerContext ctx, RemotingCommand request, ProxyContext context) throws Exception { - if (request.getExtFields().get(BROKER_NAME_FIELD) == null) { - return RemotingCommand.buildErrorResponse(ResponseCode.VERSION_NOT_SUPPORTED, - "Request doesn't have field bname"); - } PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); if (!PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag())) { ConsumerGroupInfo consumerInfo = messagingProcessor.getConsumerGroupInfo(requestHeader.getConsumerGroup()); @@ -57,16 +52,10 @@ public class PullMessageActivity extends AbstractRemotingActivity { } requestHeader.setSubscription(subscriptionData.getSubString()); requestHeader.setExpressionType(subscriptionData.getExpressionType()); + request.writeCustomHeader(requestHeader); request.makeCustomHeaderToNet(); } - String brokerName = requestHeader.getBname(); long timeoutMillis = requestHeader.getSuspendTimeoutMillis() + Duration.ofSeconds(10).toMillis(); - CompletableFuture<RemotingCommand> future = messagingProcessor.request(context, brokerName, request, timeoutMillis); - future.thenAccept(r -> writeResponse(ctx, context, request, r)) - .exceptionally(t -> { - writeErrResponse(ctx, context, request, t); - return null; - }); - return null; + return request(ctx, request, context, timeoutMillis); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java index b581d8a91..74eb3cbd8 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/AbstractRemotingActivityTest.java @@ -21,17 +21,24 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.acl.common.AclException; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.common.ProxyException; +import org.apache.rocketmq.proxy.common.ProxyExceptionCode; import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; import org.apache.rocketmq.proxy.processor.MessagingProcessor; import org.apache.rocketmq.proxy.service.channel.SimpleChannel; import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @@ -39,7 +46,8 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -50,7 +58,7 @@ public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest { @Mock MessagingProcessor messagingProcessorMock; @Spy - ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "0.0.0.0:0", "1.1.1.1:1")) { @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { return null; @@ -69,16 +77,124 @@ public class AbstractRemotingActivityTest extends InitConfigAndLoggerTest { } @Test - public void request() throws Exception { + public void testCreateContext() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + ProxyContext context = remotingActivity.createContext(ctx, request); + assertThat(context.getLanguage()).isEqualTo(LanguageCode.JAVA.name()); + assertThat(context.getAction()).isEqualTo("Remoting" + RequestCode.PULL_MESSAGE); + } + + @Test + public void testRequest() throws Exception { String brokerName = "broker"; - String remark = "success"; - when(messagingProcessorMock.request(any(), anyString(), any(), anyLong())).thenReturn(CompletableFuture.completedFuture( - RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, remark) + RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "remark"); + when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(CompletableFuture.completedFuture( + response )); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); assertThat(remotingCommand).isNull(); - verify(ctx, times(1)).writeAndFlush(any()); + verify(ctx, times(1)).writeAndFlush(response); + } + + @Test + public void testRequestOneway() throws Exception { + String brokerName = "broker"; + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.markOnewayRPC(); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(messagingProcessorMock, times(1)).requestOneway(any(), eq(brokerName), any(), anyLong()); + } + + @Test + public void testRequestInvalid() throws Exception { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField("test", "test"); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand.getCode()).isEqualTo(ResponseCode.VERSION_NOT_SUPPORTED); + verify(ctx, never()).writeAndFlush(any()); + } + + @Test + public void testRequestProxyException() throws Exception { + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + String brokerName = "broker"; + String remark = "exception"; + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.completeExceptionally(new ProxyException(ProxyExceptionCode.FORBIDDEN, remark)); + when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.NO_PERMISSION); + } + + @Test + public void testRequestClientException() throws Exception { + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + String brokerName = "broker"; + String remark = "exception"; + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.completeExceptionally(new MQClientException(remark, null)); + when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(-1); + } + + @Test + public void testRequestBrokerException() throws Exception { + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + String brokerName = "broker"; + String remark = "exception"; + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.completeExceptionally(new MQBrokerException(ResponseCode.FLUSH_DISK_TIMEOUT, remark)); + when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.FLUSH_DISK_TIMEOUT); + } + + @Test + public void testRequestAclException() throws Exception { + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + String brokerName = "broker"; + String remark = "exception"; + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.completeExceptionally(new AclException(remark, ResponseCode.MESSAGE_ILLEGAL)); + when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.NO_PERMISSION); + } + + @Test + public void testRequestDefaultException() throws Exception { + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + String brokerName = "broker"; + String remark = "exception"; + CompletableFuture<RemotingCommand> future = new CompletableFuture<>(); + future.completeExceptionally(new Exception(remark)); + when(messagingProcessorMock.request(any(), eq(brokerName), any(), anyLong())).thenReturn(future); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null); + request.addExtField(AbstractRemotingActivity.BROKER_NAME_FIELD, brokerName); + RemotingCommand remotingCommand = remotingActivity.request(ctx, request, null, 10000); + assertThat(remotingCommand).isNull(); + verify(ctx, times(1)).writeAndFlush(captor.capture()); + assertThat(captor.getValue().getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); } } \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java new file mode 100644 index 000000000..ffbe2ffac --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/PullMessageActivityTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.broker.client.ConsumerGroupInfo; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.common.sysflag.PullSysFlag; +import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PullMessageActivityTest extends InitConfigAndLoggerTest { + PullMessageActivity pullMessageActivity; + + @Mock + MessagingProcessor messagingProcessorMock; + @Mock + ConsumerGroupInfo consumerGroupInfoMock; + + String topic = "topic"; + String group = "group"; + String brokerName = "brokerName"; + String subString = "sub"; + String type = "type"; + @Spy + ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return null; + } + }; + + @Before + public void setup() throws Exception { + pullMessageActivity = new PullMessageActivity(null, messagingProcessorMock); + } + + @Test + public void testPullMessageWithoutSub() throws Exception { + when(messagingProcessorMock.getConsumerGroupInfo(eq(group))) + .thenReturn(consumerGroupInfoMock); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setSubString(subString); + subscriptionData.setExpressionType(type); + when(consumerGroupInfoMock.findSubscriptionData(eq(topic))) + .thenReturn(subscriptionData); + + PullMessageRequestHeader header = new PullMessageRequestHeader(); + header.setTopic(topic); + header.setConsumerGroup(group); + header.setQueueId(0); + header.setQueueOffset(0L); + header.setMaxMsgNums(16); + header.setSysFlag(PullSysFlag.buildSysFlag(true, false, false, false)); + header.setCommitOffset(0L); + header.setSuspendTimeoutMillis(1000L); + header.setSubVersion(0L); + header.setBname(brokerName); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header); + request.makeCustomHeaderToNet(); + RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.NO_MESSAGE, "success"); + PullMessageRequestHeader newHeader = new PullMessageRequestHeader(); + newHeader.setTopic(topic); + newHeader.setConsumerGroup(group); + newHeader.setQueueId(0); + newHeader.setQueueOffset(0L); + newHeader.setMaxMsgNums(16); + newHeader.setSysFlag(PullSysFlag.buildSysFlag(true, false, false, false)); + newHeader.setCommitOffset(0L); + newHeader.setSuspendTimeoutMillis(1000L); + newHeader.setSubVersion(0L); + newHeader.setBname(brokerName); + newHeader.setSubscription(subString); + newHeader.setExpressionType(type); + RemotingCommand matchRequest = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, newHeader); + matchRequest.setOpaque(request.getOpaque()); + matchRequest.makeCustomHeaderToNet(); + + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + when(messagingProcessorMock.request(any(), eq(brokerName), captor.capture(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(expectResponse)); + RemotingCommand response = pullMessageActivity.processRequest0(ctx, request, null); + assertThat(captor.getValue().getExtFields()).isEqualTo(matchRequest.getExtFields()); + assertThat(response).isNull(); + verify(ctx, times(1)).writeAndFlush(eq(expectResponse)); + } + + @Test + public void testPullMessageWithSub() throws Exception { + when(messagingProcessorMock.getConsumerGroupInfo(eq(group))) + .thenReturn(consumerGroupInfoMock); + SubscriptionData subscriptionData = new SubscriptionData(); + subscriptionData.setSubString(subString); + subscriptionData.setExpressionType(type); + when(consumerGroupInfoMock.findSubscriptionData(eq(topic))) + .thenReturn(subscriptionData); + + PullMessageRequestHeader header = new PullMessageRequestHeader(); + header.setTopic(topic); + header.setConsumerGroup(group); + header.setQueueId(0); + header.setQueueOffset(0L); + header.setMaxMsgNums(16); + header.setSysFlag(PullSysFlag.buildSysFlag(true, true, false, false)); + header.setCommitOffset(0L); + header.setSuspendTimeoutMillis(1000L); + header.setSubVersion(0L); + header.setBname(brokerName); + header.setSubscription(subString); + header.setExpressionType(type); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, header); + request.makeCustomHeaderToNet(); + RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.NO_MESSAGE, "success"); + + ArgumentCaptor<RemotingCommand> captor = ArgumentCaptor.forClass(RemotingCommand.class); + when(messagingProcessorMock.request(any(), eq(brokerName), captor.capture(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(expectResponse)); + RemotingCommand response = pullMessageActivity.processRequest0(ctx, request, null); + assertThat(captor.getValue().getExtFields()).isEqualTo(request.getExtFields()); + assertThat(response).isNull(); + verify(ctx, times(1)).writeAndFlush(eq(expectResponse)); + } +} \ No newline at end of file diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java new file mode 100644 index 000000000..e03bc26e0 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/remoting/activity/SendMessageActivityTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.remoting.activity; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.attribute.TopicMessageType; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.proxy.config.InitConfigAndLoggerTest; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.service.channel.SimpleChannel; +import org.apache.rocketmq.proxy.service.channel.SimpleChannelHandlerContext; +import org.apache.rocketmq.proxy.service.metadata.MetadataService; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SendMessageActivityTest extends InitConfigAndLoggerTest { + SendMessageActivity sendMessageActivity; + + @Mock + MessagingProcessor messagingProcessorMock; + @Mock + MetadataService metadataServiceMock; + + String topic = "topic"; + String producerGroup = "group"; + String brokerName = "brokerName"; + @Spy + ChannelHandlerContext ctx = new SimpleChannelHandlerContext(new SimpleChannel(null, "1", "2")) { + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return null; + } + }; + + @Before + public void setup() { + sendMessageActivity = new SendMessageActivity(null, messagingProcessorMock); + when(messagingProcessorMock.getMetadataService()).thenReturn(metadataServiceMock); + } + + @Test + public void testSendMessage() throws Exception { + when(metadataServiceMock.getTopicMessageType(eq(topic))).thenReturn(TopicMessageType.NORMAL); + Message message = new Message(topic, "123".getBytes()); + message.putUserProperty("a", "b"); + SendMessageRequestHeader sendMessageRequestHeader = new SendMessageRequestHeader(); + sendMessageRequestHeader.setTopic(topic); + sendMessageRequestHeader.setProducerGroup(producerGroup); + sendMessageRequestHeader.setDefaultTopic(""); + sendMessageRequestHeader.setDefaultTopicQueueNums(0); + sendMessageRequestHeader.setQueueId(0); + sendMessageRequestHeader.setSysFlag(0); + sendMessageRequestHeader.setBname(brokerName); + sendMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(message.getProperties())); + RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, sendMessageRequestHeader); + remotingCommand.setBody(message.getBody()); + remotingCommand.makeCustomHeaderToNet(); + + RemotingCommand expectResponse = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, "success"); + when(messagingProcessorMock.request(any(), eq(brokerName), eq(remotingCommand), anyLong())) + .thenReturn(CompletableFuture.completedFuture(expectResponse)); + RemotingCommand response = sendMessageActivity.processRequest0(ctx, remotingCommand, null); + assertThat(response).isNull(); + verify(ctx, times(1)).writeAndFlush(eq(expectResponse)); + } +} \ No newline at end of file
