This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 46869c9e9 [ISSUE #5876] fix resource leak in 
DefaultLitePullConsumerTest
46869c9e9 is described below

commit 46869c9e9b44fac006c2d9783ebd5182964a5bd1
Author: SSpirits <[email protected]>
AuthorDate: Fri Jan 27 17:13:02 2023 +0800

    [ISSUE #5876] fix resource leak in DefaultLitePullConsumerTest
---
 .../consumer/DefaultLitePullConsumerTest.java      | 226 ++++++++++++---------
 1 file changed, 127 insertions(+), 99 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 5fc4df89c..24e39f566 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -52,9 +52,9 @@ import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -83,6 +83,8 @@ public class DefaultLitePullConsumerTest {
     @Spy
     private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
 
+    private MQClientInstance mqClientInstance;
+
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
     @Mock
@@ -121,6 +123,14 @@ public class DefaultLitePullConsumerTest {
         field.set(null, true);
     }
 
+    @After
+    public void destroy() {
+        if (mqClientInstance != null) {
+            
mqClientInstance.unregisterConsumer(litePullConsumerImpl.groupName());
+            mqClientInstance.shutdown();
+        }
+    }
+
     @Test
     public void testAssign_PollMessageSuccess() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createStartLitePullConsumer();
@@ -154,7 +164,6 @@ public class DefaultLitePullConsumerTest {
         }
     }
 
-
     @Test
     public void testAssign_PollMessageWithTagSuccess() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createStartLitePullConsumerWithTag();
@@ -173,33 +182,36 @@ public class DefaultLitePullConsumerTest {
     @Test
     public void testConsumerCommitSyncWithMQOffset() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createNotStartLitePullConsumer();
-        RemoteBrokerOffsetStore store = new 
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
-        litePullConsumer.setOffsetStore(store);
-        litePullConsumer.start();
-        initDefaultLitePullConsumer(litePullConsumer);
-
-        //replace with real offsetStore.
-        Field offsetStore = 
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
-        offsetStore.setAccessible(true);
-        offsetStore.set(litePullConsumerImpl, store);
-
-        MessageQueue messageQueue = createMessageQueue();
-        HashSet<MessageQueue> set = new HashSet<>();
-        set.add(messageQueue);
+        try {
+            RemoteBrokerOffsetStore store = new 
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+            litePullConsumer.setOffsetStore(store);
+            litePullConsumer.start();
+            initDefaultLitePullConsumer(litePullConsumer);
 
-        //mock assign and reset offset
-        litePullConsumer.assign(set);
-        litePullConsumer.seek(messageQueue, 0);
-        await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> 
assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0));
-        //commit offset 1
-        Map<MessageQueue, Long> commitOffset = new HashMap<>();
-        commitOffset.put(messageQueue, 1L);
-        litePullConsumer.commit(commitOffset, true);
+            //replace with real offsetStore.
+            Field offsetStore = 
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+            offsetStore.setAccessible(true);
+            offsetStore.set(litePullConsumerImpl, store);
 
-        assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
+            MessageQueue messageQueue = createMessageQueue();
+            HashSet<MessageQueue> set = new HashSet<>();
+            set.add(messageQueue);
+
+            //mock assign and reset offset
+            litePullConsumer.assign(set);
+            litePullConsumer.seek(messageQueue, 0);
+            await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> 
assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0));
+            //commit offset 1
+            Map<MessageQueue, Long> commitOffset = new HashMap<>();
+            commitOffset.put(messageQueue, 1L);
+            litePullConsumer.commit(commitOffset, true);
+
+            assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
+        } finally {
+            litePullConsumer.shutdown();
+        }
     }
 
-
     @Test
     public void testSubscribe_PollMessageSuccess() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createSubscribeLitePullConsumer();
@@ -377,8 +389,12 @@ public class DefaultLitePullConsumerTest {
         }
         doReturn(123L).when(mQAdminImpl).searchOffset(any(MessageQueue.class), 
anyLong());
         litePullConsumer = createStartLitePullConsumer();
-        long offset = litePullConsumer.offsetForTimestamp(messageQueue, 
123456L);
-        assertThat(offset).isEqualTo(123L);
+        try {
+            long offset = litePullConsumer.offsetForTimestamp(messageQueue, 
123456L);
+            assertThat(offset).isEqualTo(123L);
+        } finally {
+            litePullConsumer.shutdown();
+        }
     }
 
     @Test
@@ -452,19 +468,23 @@ public class DefaultLitePullConsumerTest {
     public void testRegisterTopicMessageQueueChangeListener_Success() throws 
Exception {
         flag = false;
         DefaultLitePullConsumer litePullConsumer = 
createStartLitePullConsumer();
-        
doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
-        litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
-        litePullConsumer.registerTopicMessageQueueChangeListener(topic, new 
TopicMessageQueueChangeListener() {
-            @Override
-            public void onChanged(String topic, Set<MessageQueue> 
messageQueues) {
-                flag = true;
-            }
-        });
-        Set<MessageQueue> set = new HashSet<>();
-        set.add(createMessageQueue());
-        
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
-        Thread.sleep(11 * 1000);
-        assertThat(flag).isTrue();
+        try {
+            
doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
+            litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
+            litePullConsumer.registerTopicMessageQueueChangeListener(topic, 
new TopicMessageQueueChangeListener() {
+                @Override
+                public void onChanged(String topic, Set<MessageQueue> 
messageQueues) {
+                    flag = true;
+                }
+            });
+            Set<MessageQueue> set = new HashSet<>();
+            set.add(createMessageQueue());
+            
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
+            Thread.sleep(11 * 1000);
+            assertThat(flag).isTrue();
+        } finally {
+            litePullConsumer.shutdown();
+        }
     }
 
     @Test
@@ -568,11 +588,15 @@ public class DefaultLitePullConsumerTest {
     @Test
     public void testComputePullFromWhereReturnedNotFound() throws Exception {
         DefaultLitePullConsumer defaultLitePullConsumer = 
createStartLitePullConsumer();
-        
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-        MessageQueue messageQueue = createMessageQueue();
-        when(offsetStore.readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class))).thenReturn(-1L);
-        long offset = rebalanceImpl.computePullFromWhere(messageQueue);
-        assertThat(offset).isEqualTo(0);
+        try {
+            
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+            MessageQueue messageQueue = createMessageQueue();
+            when(offsetStore.readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class))).thenReturn(-1L);
+            long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+            assertThat(offset).isEqualTo(0);
+        } finally {
+            defaultLitePullConsumer.shutdown();
+        }
     }
 
     @Test
@@ -583,6 +607,7 @@ public class DefaultLitePullConsumerTest {
         when(offsetStore.readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class))).thenReturn(100L);
         long offset = rebalanceImpl.computePullFromWhere(messageQueue);
         assertThat(offset).isEqualTo(100);
+        defaultLitePullConsumer.shutdown();
     }
 
     @Test
@@ -594,18 +619,23 @@ public class DefaultLitePullConsumerTest {
         
when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L);
         long offset = rebalanceImpl.computePullFromWhere(messageQueue);
         assertThat(offset).isEqualTo(100);
+        defaultLitePullConsumer.shutdown();
     }
 
     @Test
     public void testComputePullByTimeStamp() throws Exception {
         DefaultLitePullConsumer defaultLitePullConsumer = 
createStartLitePullConsumer();
-        
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
-        defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
-        MessageQueue messageQueue = createMessageQueue();
-        when(offsetStore.readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class))).thenReturn(-1L);
-        
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class), 
anyLong())).thenReturn(100L);
-        long offset = rebalanceImpl.computePullFromWhere(messageQueue);
-        assertThat(offset).isEqualTo(100);
+        try {
+            
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+            defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
+            MessageQueue messageQueue = createMessageQueue();
+            when(offsetStore.readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class))).thenReturn(-1L);
+            
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class), 
anyLong())).thenReturn(100L);
+            long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+            assertThat(offset).isEqualTo(100);
+        } finally {
+            defaultLitePullConsumer.shutdown();
+        }
     }
 
     @Test
@@ -622,51 +652,51 @@ public class DefaultLitePullConsumerTest {
     @Test
     public void testConsumerCommitWithMQ() throws Exception {
         DefaultLitePullConsumer litePullConsumer = 
createNotStartLitePullConsumer();
-        RemoteBrokerOffsetStore store = new 
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
-        litePullConsumer.setOffsetStore(store);
-        litePullConsumer.start();
-        initDefaultLitePullConsumer(litePullConsumer);
+        try {
+            RemoteBrokerOffsetStore store = new 
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+            litePullConsumer.setOffsetStore(store);
+            litePullConsumer.start();
+            initDefaultLitePullConsumer(litePullConsumer);
 
-        //replace with real offsetStore.
-        Field offsetStore = 
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
-        offsetStore.setAccessible(true);
-        offsetStore.set(litePullConsumerImpl, store);
+            //replace with real offsetStore.
+            Field offsetStore = 
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+            offsetStore.setAccessible(true);
+            offsetStore.set(litePullConsumerImpl, store);
 
-        MessageQueue messageQueue = createMessageQueue();
-        HashSet<MessageQueue> set = new HashSet<>();
-        set.add(messageQueue);
+            MessageQueue messageQueue = createMessageQueue();
+            HashSet<MessageQueue> set = new HashSet<>();
+            set.add(messageQueue);
 
-        //mock assign and reset offset
-        litePullConsumer.assign(set);
-        litePullConsumer.seek(messageQueue, 0);
+            //mock assign and reset offset
+            litePullConsumer.assign(set);
+            litePullConsumer.seek(messageQueue, 0);
 
-        //commit
-        litePullConsumer.commit(set, true);
+            //commit
+            litePullConsumer.commit(set, true);
 
-        assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
+            assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
+        } finally {
+            litePullConsumer.shutdown();
+        }
     }
 
-
     static class AsyncConsumer {
         public void executeAsync(final DefaultLitePullConsumer consumer) {
-            new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    while (consumer.isRunning()) {
-                        List<MessageExt> poll = consumer.poll(2 * 1000);
-                    }
+            new Thread(() -> {
+                while (consumer.isRunning()) {
+                    consumer.poll(2 * 1000);
                 }
             }).start();
         }
     }
 
     private void initDefaultLitePullConsumer(DefaultLitePullConsumer 
litePullConsumer) throws Exception {
-
         Field field = 
DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
         field.setAccessible(true);
         litePullConsumerImpl = (DefaultLitePullConsumerImpl) 
field.get(litePullConsumer);
         field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
         field.setAccessible(true);
+        mqClientInstance = (MQClientInstance) field.get(litePullConsumerImpl);
         field.set(litePullConsumerImpl, mQClientFactory);
 
         PullAPIWrapper pullAPIWrapper = 
litePullConsumerImpl.getPullAPIWrapper();
@@ -755,24 +785,24 @@ public class DefaultLitePullConsumerTest {
         field.set(litePullConsumerImpl, offsetStore);
 
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
-                anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
-                .thenAnswer(new Answer<PullResult>() {
-                    @Override
-                    public PullResult answer(InvocationOnMock mock) throws 
Throwable {
-                        PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
-                        MessageClientExt messageClientExt = new 
MessageClientExt();
-                        messageClientExt.setTopic(topic);
-                        messageClientExt.setTags("tagA");
-                        messageClientExt.setQueueId(0);
-                        messageClientExt.setMsgId("123");
-                        messageClientExt.setBody(new byte[] {'a'});
-                        messageClientExt.setOffsetMsgId("234");
-                        messageClientExt.setBornHost(new 
InetSocketAddress(8080));
-                        messageClientExt.setStoreHost(new 
InetSocketAddress(8080));
-                        PullResult pullResult = 
createPullResult(requestHeader, PullStatus.FOUND, 
Collections.singletonList(messageClientExt));
-                        return pullResult;
-                    }
-                });
+            anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
+            .thenAnswer(new Answer<PullResult>() {
+                @Override
+                public PullResult answer(InvocationOnMock mock) throws 
Throwable {
+                    PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
+                    MessageClientExt messageClientExt = new MessageClientExt();
+                    messageClientExt.setTopic(topic);
+                    messageClientExt.setTags("tagA");
+                    messageClientExt.setQueueId(0);
+                    messageClientExt.setMsgId("123");
+                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setOffsetMsgId("234");
+                    messageClientExt.setBornHost(new InetSocketAddress(8080));
+                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                    PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.singletonList(messageClientExt));
+                    return pullResult;
+                }
+            });
 
         when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", 
false));
 
@@ -815,7 +845,6 @@ public class DefaultLitePullConsumerTest {
         return litePullConsumer;
     }
 
-
     private DefaultLitePullConsumer createStartLitePullConsumerWithTag() 
throws Exception {
         DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
         litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -859,13 +888,12 @@ public class DefaultLitePullConsumerTest {
         return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + 
messageExtList.size(), 123, 2048, messageExtList, 0, 
outputStream.toByteArray());
     }
 
-    private static void suppressUpdateTopicRouteInfoFromNameServer(
+    private void suppressUpdateTopicRouteInfoFromNameServer(
         DefaultLitePullConsumer litePullConsumer) throws 
IllegalAccessException {
-        DefaultLitePullConsumerImpl defaultLitePullConsumerImpl = 
(DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer, 
"defaultLitePullConsumerImpl", true);
         if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
             litePullConsumer.changeInstanceNameToPID();
         }
-        MQClientInstance mQClientFactory = 
spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer, 
(RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook", 
true)));
+
         ConcurrentMap<String, MQClientInstance> factoryTable = 
(ConcurrentMap<String, MQClientInstance>) 
FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", 
true);
         factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
         
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());

Reply via email to