This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 46869c9e9 [ISSUE #5876] fix resource leak in
DefaultLitePullConsumerTest
46869c9e9 is described below
commit 46869c9e9b44fac006c2d9783ebd5182964a5bd1
Author: SSpirits <[email protected]>
AuthorDate: Fri Jan 27 17:13:02 2023 +0800
[ISSUE #5876] fix resource leak in DefaultLitePullConsumerTest
---
.../consumer/DefaultLitePullConsumerTest.java | 226 ++++++++++++---------
1 file changed, 127 insertions(+), 99 deletions(-)
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 5fc4df89c..24e39f566 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -52,9 +52,9 @@ import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -83,6 +83,8 @@ public class DefaultLitePullConsumerTest {
@Spy
private MQClientInstance mQClientFactory =
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
+ private MQClientInstance mqClientInstance;
+
@Mock
private MQClientAPIImpl mQClientAPIImpl;
@Mock
@@ -121,6 +123,14 @@ public class DefaultLitePullConsumerTest {
field.set(null, true);
}
+ @After
+ public void destroy() {
+ if (mqClientInstance != null) {
+
mqClientInstance.unregisterConsumer(litePullConsumerImpl.groupName());
+ mqClientInstance.shutdown();
+ }
+ }
+
@Test
public void testAssign_PollMessageSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createStartLitePullConsumer();
@@ -154,7 +164,6 @@ public class DefaultLitePullConsumerTest {
}
}
-
@Test
public void testAssign_PollMessageWithTagSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createStartLitePullConsumerWithTag();
@@ -173,33 +182,36 @@ public class DefaultLitePullConsumerTest {
@Test
public void testConsumerCommitSyncWithMQOffset() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createNotStartLitePullConsumer();
- RemoteBrokerOffsetStore store = new
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
- litePullConsumer.setOffsetStore(store);
- litePullConsumer.start();
- initDefaultLitePullConsumer(litePullConsumer);
-
- //replace with real offsetStore.
- Field offsetStore =
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
- offsetStore.setAccessible(true);
- offsetStore.set(litePullConsumerImpl, store);
-
- MessageQueue messageQueue = createMessageQueue();
- HashSet<MessageQueue> set = new HashSet<>();
- set.add(messageQueue);
+ try {
+ RemoteBrokerOffsetStore store = new
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+ litePullConsumer.setOffsetStore(store);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
- //mock assign and reset offset
- litePullConsumer.assign(set);
- litePullConsumer.seek(messageQueue, 0);
- await().atMost(Duration.ofSeconds(5)).untilAsserted(() ->
assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0));
- //commit offset 1
- Map<MessageQueue, Long> commitOffset = new HashMap<>();
- commitOffset.put(messageQueue, 1L);
- litePullConsumer.commit(commitOffset, true);
+ //replace with real offsetStore.
+ Field offsetStore =
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+ offsetStore.setAccessible(true);
+ offsetStore.set(litePullConsumerImpl, store);
- assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
+ MessageQueue messageQueue = createMessageQueue();
+ HashSet<MessageQueue> set = new HashSet<>();
+ set.add(messageQueue);
+
+ //mock assign and reset offset
+ litePullConsumer.assign(set);
+ litePullConsumer.seek(messageQueue, 0);
+ await().atMost(Duration.ofSeconds(5)).untilAsserted(() ->
assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0));
+ //commit offset 1
+ Map<MessageQueue, Long> commitOffset = new HashMap<>();
+ commitOffset.put(messageQueue, 1L);
+ litePullConsumer.commit(commitOffset, true);
+
+ assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(1);
+ } finally {
+ litePullConsumer.shutdown();
+ }
}
-
@Test
public void testSubscribe_PollMessageSuccess() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createSubscribeLitePullConsumer();
@@ -377,8 +389,12 @@ public class DefaultLitePullConsumerTest {
}
doReturn(123L).when(mQAdminImpl).searchOffset(any(MessageQueue.class),
anyLong());
litePullConsumer = createStartLitePullConsumer();
- long offset = litePullConsumer.offsetForTimestamp(messageQueue,
123456L);
- assertThat(offset).isEqualTo(123L);
+ try {
+ long offset = litePullConsumer.offsetForTimestamp(messageQueue,
123456L);
+ assertThat(offset).isEqualTo(123L);
+ } finally {
+ litePullConsumer.shutdown();
+ }
}
@Test
@@ -452,19 +468,23 @@ public class DefaultLitePullConsumerTest {
public void testRegisterTopicMessageQueueChangeListener_Success() throws
Exception {
flag = false;
DefaultLitePullConsumer litePullConsumer =
createStartLitePullConsumer();
-
doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
- litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
- litePullConsumer.registerTopicMessageQueueChangeListener(topic, new
TopicMessageQueueChangeListener() {
- @Override
- public void onChanged(String topic, Set<MessageQueue>
messageQueues) {
- flag = true;
- }
- });
- Set<MessageQueue> set = new HashSet<>();
- set.add(createMessageQueue());
-
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
- Thread.sleep(11 * 1000);
- assertThat(flag).isTrue();
+ try {
+
doReturn(Collections.emptySet()).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
+ litePullConsumer.setTopicMetadataCheckIntervalMillis(10);
+ litePullConsumer.registerTopicMessageQueueChangeListener(topic,
new TopicMessageQueueChangeListener() {
+ @Override
+ public void onChanged(String topic, Set<MessageQueue>
messageQueues) {
+ flag = true;
+ }
+ });
+ Set<MessageQueue> set = new HashSet<>();
+ set.add(createMessageQueue());
+
doReturn(set).when(mQAdminImpl).fetchSubscribeMessageQueues(anyString());
+ Thread.sleep(11 * 1000);
+ assertThat(flag).isTrue();
+ } finally {
+ litePullConsumer.shutdown();
+ }
}
@Test
@@ -568,11 +588,15 @@ public class DefaultLitePullConsumerTest {
@Test
public void testComputePullFromWhereReturnedNotFound() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer =
createStartLitePullConsumer();
-
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- MessageQueue messageQueue = createMessageQueue();
- when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
- long offset = rebalanceImpl.computePullFromWhere(messageQueue);
- assertThat(offset).isEqualTo(0);
+ try {
+
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(0);
+ } finally {
+ defaultLitePullConsumer.shutdown();
+ }
}
@Test
@@ -583,6 +607,7 @@ public class DefaultLitePullConsumerTest {
when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
+ defaultLitePullConsumer.shutdown();
}
@Test
@@ -594,18 +619,23 @@ public class DefaultLitePullConsumerTest {
when(mQClientFactory.getMQAdminImpl().maxOffset(any(MessageQueue.class))).thenReturn(100L);
long offset = rebalanceImpl.computePullFromWhere(messageQueue);
assertThat(offset).isEqualTo(100);
+ defaultLitePullConsumer.shutdown();
}
@Test
public void testComputePullByTimeStamp() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer =
createStartLitePullConsumer();
-
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
- defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
- MessageQueue messageQueue = createMessageQueue();
- when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
-
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),
anyLong())).thenReturn(100L);
- long offset = rebalanceImpl.computePullFromWhere(messageQueue);
- assertThat(offset).isEqualTo(100);
+ try {
+
defaultLitePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+ defaultLitePullConsumer.setConsumeTimestamp("20191024171201");
+ MessageQueue messageQueue = createMessageQueue();
+ when(offsetStore.readOffset(any(MessageQueue.class),
any(ReadOffsetType.class))).thenReturn(-1L);
+
when(mQClientFactory.getMQAdminImpl().searchOffset(any(MessageQueue.class),
anyLong())).thenReturn(100L);
+ long offset = rebalanceImpl.computePullFromWhere(messageQueue);
+ assertThat(offset).isEqualTo(100);
+ } finally {
+ defaultLitePullConsumer.shutdown();
+ }
}
@Test
@@ -622,51 +652,51 @@ public class DefaultLitePullConsumerTest {
@Test
public void testConsumerCommitWithMQ() throws Exception {
DefaultLitePullConsumer litePullConsumer =
createNotStartLitePullConsumer();
- RemoteBrokerOffsetStore store = new
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
- litePullConsumer.setOffsetStore(store);
- litePullConsumer.start();
- initDefaultLitePullConsumer(litePullConsumer);
+ try {
+ RemoteBrokerOffsetStore store = new
RemoteBrokerOffsetStore(mQClientFactory, consumerGroup);
+ litePullConsumer.setOffsetStore(store);
+ litePullConsumer.start();
+ initDefaultLitePullConsumer(litePullConsumer);
- //replace with real offsetStore.
- Field offsetStore =
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
- offsetStore.setAccessible(true);
- offsetStore.set(litePullConsumerImpl, store);
+ //replace with real offsetStore.
+ Field offsetStore =
litePullConsumerImpl.getClass().getDeclaredField("offsetStore");
+ offsetStore.setAccessible(true);
+ offsetStore.set(litePullConsumerImpl, store);
- MessageQueue messageQueue = createMessageQueue();
- HashSet<MessageQueue> set = new HashSet<>();
- set.add(messageQueue);
+ MessageQueue messageQueue = createMessageQueue();
+ HashSet<MessageQueue> set = new HashSet<>();
+ set.add(messageQueue);
- //mock assign and reset offset
- litePullConsumer.assign(set);
- litePullConsumer.seek(messageQueue, 0);
+ //mock assign and reset offset
+ litePullConsumer.assign(set);
+ litePullConsumer.seek(messageQueue, 0);
- //commit
- litePullConsumer.commit(set, true);
+ //commit
+ litePullConsumer.commit(set, true);
- assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
+ assertThat(litePullConsumer.committed(messageQueue)).isEqualTo(0);
+ } finally {
+ litePullConsumer.shutdown();
+ }
}
-
static class AsyncConsumer {
public void executeAsync(final DefaultLitePullConsumer consumer) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (consumer.isRunning()) {
- List<MessageExt> poll = consumer.poll(2 * 1000);
- }
+ new Thread(() -> {
+ while (consumer.isRunning()) {
+ consumer.poll(2 * 1000);
}
}).start();
}
}
private void initDefaultLitePullConsumer(DefaultLitePullConsumer
litePullConsumer) throws Exception {
-
Field field =
DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
field.setAccessible(true);
litePullConsumerImpl = (DefaultLitePullConsumerImpl)
field.get(litePullConsumer);
field =
DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
+ mqClientInstance = (MQClientInstance) field.get(litePullConsumerImpl);
field.set(litePullConsumerImpl, mQClientFactory);
PullAPIWrapper pullAPIWrapper =
litePullConsumerImpl.getPullAPIWrapper();
@@ -755,24 +785,24 @@ public class DefaultLitePullConsumerTest {
field.set(litePullConsumerImpl, offsetStore);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(),
any(PullMessageRequestHeader.class),
- anyLong(), any(CommunicationMode.class),
nullable(PullCallback.class)))
- .thenAnswer(new Answer<PullResult>() {
- @Override
- public PullResult answer(InvocationOnMock mock) throws
Throwable {
- PullMessageRequestHeader requestHeader =
mock.getArgument(1);
- MessageClientExt messageClientExt = new
MessageClientExt();
- messageClientExt.setTopic(topic);
- messageClientExt.setTags("tagA");
- messageClientExt.setQueueId(0);
- messageClientExt.setMsgId("123");
- messageClientExt.setBody(new byte[] {'a'});
- messageClientExt.setOffsetMsgId("234");
- messageClientExt.setBornHost(new
InetSocketAddress(8080));
- messageClientExt.setStoreHost(new
InetSocketAddress(8080));
- PullResult pullResult =
createPullResult(requestHeader, PullStatus.FOUND,
Collections.singletonList(messageClientExt));
- return pullResult;
- }
- });
+ anyLong(), any(CommunicationMode.class),
nullable(PullCallback.class)))
+ .thenAnswer(new Answer<PullResult>() {
+ @Override
+ public PullResult answer(InvocationOnMock mock) throws
Throwable {
+ PullMessageRequestHeader requestHeader =
mock.getArgument(1);
+ MessageClientExt messageClientExt = new MessageClientExt();
+ messageClientExt.setTopic(topic);
+ messageClientExt.setTags("tagA");
+ messageClientExt.setQueueId(0);
+ messageClientExt.setMsgId("123");
+ messageClientExt.setBody(new byte[] {'a'});
+ messageClientExt.setOffsetMsgId("234");
+ messageClientExt.setBornHost(new InetSocketAddress(8080));
+ messageClientExt.setStoreHost(new InetSocketAddress(8080));
+ PullResult pullResult = createPullResult(requestHeader,
PullStatus.FOUND, Collections.singletonList(messageClientExt));
+ return pullResult;
+ }
+ });
when(mQClientFactory.findBrokerAddressInSubscribe(anyString(),
anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911",
false));
@@ -815,7 +845,6 @@ public class DefaultLitePullConsumerTest {
return litePullConsumer;
}
-
private DefaultLitePullConsumer createStartLitePullConsumerWithTag()
throws Exception {
DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
@@ -859,13 +888,12 @@ public class DefaultLitePullConsumerTest {
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() +
messageExtList.size(), 123, 2048, messageExtList, 0,
outputStream.toByteArray());
}
- private static void suppressUpdateTopicRouteInfoFromNameServer(
+ private void suppressUpdateTopicRouteInfoFromNameServer(
DefaultLitePullConsumer litePullConsumer) throws
IllegalAccessException {
- DefaultLitePullConsumerImpl defaultLitePullConsumerImpl =
(DefaultLitePullConsumerImpl) FieldUtils.readDeclaredField(litePullConsumer,
"defaultLitePullConsumerImpl", true);
if (litePullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
litePullConsumer.changeInstanceNameToPID();
}
- MQClientInstance mQClientFactory =
spy(MQClientManager.getInstance().getOrCreateMQClientInstance(litePullConsumer,
(RPCHook) FieldUtils.readDeclaredField(defaultLitePullConsumerImpl, "rpcHook",
true)));
+
ConcurrentMap<String, MQClientInstance> factoryTable =
(ConcurrentMap<String, MQClientInstance>)
FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable",
true);
factoryTable.put(litePullConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());