This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 03c1f11 [ISSUE #2378] FIx `NullPointerException` when Consumer
shutdown in the ClientRemotingProcessor.
03c1f11 is described below
commit 03c1f11d6bc3034daca519de043fd4d2f69bb047
Author: Alvin <[email protected]>
AuthorDate: Sun Nov 1 22:56:47 2020 +0800
[ISSUE #2378] FIx `NullPointerException` when Consumer shutdown in the
ClientRemotingProcessor.
---
.../client/impl/factory/MQClientInstance.java | 3 +++
.../client/impl/factory/MQClientInstanceTest.java | 29 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index b5aaeb8..d40bdc2 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1214,6 +1214,9 @@ public class MQClientInstance {
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup)
{
MQConsumerInner mqConsumerInner =
this.consumerTable.get(consumerGroup);
+ if (mqConsumerInner == null) {
+ return null;
+ }
ConsumerRunningInfo consumerRunningInfo =
mqConsumerInner.consumerRunningInfo();
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index e0506aa..a3457e1 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.factory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.ClientConfig;
@@ -29,6 +30,8 @@ import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -41,6 +44,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
@@ -139,6 +143,31 @@ public class MQClientInstanceTest {
assertThat(flag).isTrue();
}
+
+ @Test
+ public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws
RemotingException, InterruptedException, MQBrokerException {
+ MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class);
+ ConsumerRunningInfo mockConsumerRunningInfo =
mock(ConsumerRunningInfo.class);
+
when(mockConsumerInner.consumerRunningInfo()).thenReturn(mockConsumerRunningInfo);
+
when(mockConsumerInner.consumeType()).thenReturn(ConsumeType.CONSUME_PASSIVELY);
+ Properties properties = new Properties();
+ when(mockConsumerRunningInfo.getProperties()).thenReturn(properties);
+ mqClientInstance.unregisterConsumer(group);
+
+ ConsumerRunningInfo runningInfo =
mqClientInstance.consumerRunningInfo(group);
+ assertThat(runningInfo).isNull();
+ boolean flag = mqClientInstance.registerConsumer(group,
mockConsumerInner);
+ assertThat(flag).isTrue();
+
+ runningInfo = mqClientInstance.consumerRunningInfo(group);
+ assertThat(runningInfo).isNotNull();
+
assertThat(mockConsumerInner.consumerRunningInfo().getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE));
+
+ mqClientInstance.unregisterConsumer(group);
+ flag = mqClientInstance.registerConsumer(group,
mock(MQConsumerInner.class));
+ assertThat(flag).isTrue();
+ }
+
@Test
public void testRegisterAdminExt() {
boolean flag = mqClientInstance.registerAdminExt(group,
mock(MQAdminExtInner.class));