This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2ed4ba23e0 nly initialize all the variables once to speed up test 
ConsumeMessageConcurrentlyServiceTest (#8436)
2ed4ba23e0 is described below

commit 2ed4ba23e0448d5a7a4f68c29d0f9bb7a6d30ee3
Author: TestBoost <[email protected]>
AuthorDate: Thu Aug 1 02:17:57 2024 -0500

    nly initialize all the variables once to speed up test 
ConsumeMessageConcurrentlyServiceTest (#8436)
---
 .../ConsumeMessageConcurrentlyServiceTest.java     | 123 +++++++++------------
 1 file changed, 54 insertions(+), 69 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
index 749201e3c2..395c0ff233 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyServiceTest.java
@@ -52,15 +52,14 @@ import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.body.ConsumeStatus;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -70,49 +69,54 @@ import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
+import org.mockito.Mockito;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ConsumeMessageConcurrentlyServiceTest {
-    private String consumerGroup;
-    private String topic = "FooBar";
-    private String brokerName = "BrokerA";
-    private MQClientInstance mQClientFactory;
+
+    private static String consumerGroup;
+
+    private static String topic = "FooBar";
+
+    private static String brokerName = "BrokerA";
+
+    private static MQClientInstance mQClientFactory;
 
     @Mock
-    private MQClientAPIImpl mQClientAPIImpl;
-    private PullAPIWrapper pullAPIWrapper;
-    private RebalancePushImpl rebalancePushImpl;
-    private DefaultMQPushConsumer pushConsumer;
+    private static MQClientAPIImpl mQClientAPIImpl;
+
+    private static PullAPIWrapper pullAPIWrapper;
+
+    private static RebalancePushImpl rebalancePushImpl;
+
+    private static DefaultMQPushConsumer pushConsumer;
 
-    @Before
-    public void init() throws Exception {
+    @BeforeClass
+    public static void init() throws Exception {
+        mQClientAPIImpl = Mockito.mock(MQClientAPIImpl.class);
         ConcurrentMap<String, MQClientInstance> factoryTable = 
(ConcurrentMap<String, MQClientInstance>) 
FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", 
true);
         Collection<MQClientInstance> instances = factoryTable.values();
         for (MQClientInstance instance : instances) {
             instance.shutdown();
         }
         factoryTable.clear();
-
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
         pushConsumer = new DefaultMQPushConsumer(consumerGroup);
         pushConsumer.setNamesrvAddr("127.0.0.1:9876");
         pushConsumer.setPullInterval(60 * 1000);
-
         pushConsumer.registerMessageListener(new MessageListenerConcurrently() 
{
+
             @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
-                                                            
ConsumeConcurrentlyContext context) {
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
-
         DefaultMQPushConsumerImpl pushConsumerImpl = 
pushConsumer.getDefaultMQPushConsumerImpl();
         rebalancePushImpl = spy(new 
RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
         Field field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
         field.set(pushConsumerImpl, rebalancePushImpl);
         pushConsumer.subscribe(topic, "*");
-
         // suppress updateTopicRouteInfoFromNameServer
         pushConsumer.changeInstanceNameToPID();
         mQClientFactory = 
MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, 
(RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
@@ -121,38 +125,32 @@ public class ConsumeMessageConcurrentlyServiceTest {
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
         factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
-
         field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
         field.setAccessible(true);
         field.set(mQClientFactory, mQClientAPIImpl);
-
         pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, 
consumerGroup, false));
         field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
         field.setAccessible(true);
         field.set(pushConsumerImpl, pullAPIWrapper);
-
         
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class))).thenAnswer(new Answer<PullResult>() {
 
-        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
-                anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
-                .thenAnswer(new Answer<PullResult>() {
-                    @Override
-                    public PullResult answer(InvocationOnMock mock) throws 
Throwable {
-                        PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
-                        MessageClientExt messageClientExt = new 
MessageClientExt();
-                        messageClientExt.setTopic(topic);
-                        messageClientExt.setQueueId(0);
-                        messageClientExt.setMsgId("123");
-                        messageClientExt.setBody(new byte[] {'a'});
-                        messageClientExt.setOffsetMsgId("234");
-                        messageClientExt.setBornHost(new 
InetSocketAddress(8080));
-                        messageClientExt.setStoreHost(new 
InetSocketAddress(8080));
-                        PullResult pullResult = 
createPullResult(requestHeader, PullStatus.FOUND, 
Collections.<MessageExt>singletonList(messageClientExt));
-                        ((PullCallback) 
mock.getArgument(4)).onSuccess(pullResult);
-                        return pullResult;
-                    }
-                });
-
+            @Override
+            public PullResult answer(InvocationOnMock mock) throws Throwable {
+                PullMessageRequestHeader requestHeader = mock.getArgument(1);
+                MessageClientExt messageClientExt = new MessageClientExt();
+                messageClientExt.setTopic(topic);
+                messageClientExt.setQueueId(0);
+                messageClientExt.setMsgId("123");
+                messageClientExt.setBody(new byte[] { 'a' });
+                messageClientExt.setOffsetMsgId("234");
+                messageClientExt.setBornHost(new InetSocketAddress(8080));
+                messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                return pullResult;
+            }
+        });
         doReturn(new FindBrokerResult("127.0.0.1:10912", 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean());
         
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
         Set<MessageQueue> messageQueueSet = new HashSet<>();
@@ -162,54 +160,45 @@ public class ConsumeMessageConcurrentlyServiceTest {
     }
 
     @Test
-    public void testPullMessage_ConsumeSuccess() throws InterruptedException, 
RemotingException, MQBrokerException, NoSuchFieldException,Exception {
+    public void testPullMessage_ConsumeSuccess() throws InterruptedException, 
RemotingException, MQBrokerException, NoSuchFieldException, Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final AtomicReference<MessageExt> messageAtomic = new 
AtomicReference<>();
+        ConsumeMessageConcurrentlyService normalServie = new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
 
-        ConsumeMessageConcurrentlyService  normalServie = new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
             @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
-                                                            
ConsumeConcurrentlyContext context) {
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                 messageAtomic.set(msgs.get(0));
                 countDownLatch.countDown();
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
-
         PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestImmediately(createPullRequest());
         countDownLatch.await();
-
         Thread.sleep(1000);
-
-        ConsumeStatus stats = 
normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic);
-
-        ConsumerStatsManager mgr  =   normalServie.getConsumerStatsManager();
-
+        ConsumeStatus stats = 
normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),
 topic);
+        ConsumerStatsManager mgr = normalServie.getConsumerStatsManager();
         Field statItmeSetField = 
mgr.getClass().getDeclaredField("topicAndGroupConsumeOKTPS");
         statItmeSetField.setAccessible(true);
-
-        StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
+        StatsItemSet itemSet = (StatsItemSet) statItmeSetField.get(mgr);
         StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + 
pushConsumer.getDefaultMQPushConsumerImpl().groupName());
-
         assertThat(item.getValue().sum()).isGreaterThan(0L);
         MessageExt msg = messageAtomic.get();
         assertThat(msg).isNotNull();
         assertThat(msg.getTopic()).isEqualTo(topic);
-        assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
+        assertThat(msg.getBody()).isEqualTo(new byte[] { 'a' });
     }
 
-    @After
-    public void terminate() {
+    @AfterClass
+    public static void terminate() {
         pushConsumer.shutdown();
     }
 
-    private PullRequest createPullRequest() {
+    private static PullRequest createPullRequest() {
         PullRequest pullRequest = new PullRequest();
         pullRequest.setConsumerGroup(consumerGroup);
         pullRequest.setNextOffset(1024);
-
         MessageQueue messageQueue = new MessageQueue();
         messageQueue.setBrokerName(brokerName);
         messageQueue.setQueueId(0);
@@ -219,12 +208,10 @@ public class ConsumeMessageConcurrentlyServiceTest {
         processQueue.setLocked(true);
         processQueue.setLastLockTimestamp(System.currentTimeMillis());
         pullRequest.setProcessQueue(processQueue);
-
         return pullRequest;
     }
 
-    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus,
-                                           List<MessageExt> messageExtList) 
throws Exception {
+    private static PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws 
Exception {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         for (MessageExt messageExt : messageExtList) {
             outputStream.write(MessageDecoder.encode(messageExt, false));
@@ -236,23 +223,21 @@ public class ConsumeMessageConcurrentlyServiceTest {
     public void testConsumeThreadName() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final AtomicReference<String> consumeThreadName = new 
AtomicReference<>();
-
         StringBuilder consumeGroup2 = new StringBuilder();
         for (int i = 0; i < 101; i++) {
             consumeGroup2.append(i).append("#");
         }
         pushConsumer.setConsumerGroup(consumeGroup2.toString());
-        ConsumeMessageConcurrentlyService  normalServie = new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
+        ConsumeMessageConcurrentlyService normalServie = new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
+
             @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
-                                                            
ConsumeConcurrentlyContext context) {
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                 consumeThreadName.set(Thread.currentThread().getName());
                 countDownLatch.countDown();
                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
             }
         });
         
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
-
         PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
         pullMessageService.executePullRequestImmediately(createPullRequest());
         countDownLatch.await();

Reply via email to