This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 03c1f11  [ISSUE #2378] FIx `NullPointerException` when Consumer 
shutdown in the ClientRemotingProcessor.
03c1f11 is described below

commit 03c1f11d6bc3034daca519de043fd4d2f69bb047
Author: Alvin <[email protected]>
AuthorDate: Sun Nov 1 22:56:47 2020 +0800

    [ISSUE #2378] FIx `NullPointerException` when Consumer shutdown in the 
ClientRemotingProcessor.
---
 .../client/impl/factory/MQClientInstance.java      |  3 +++
 .../client/impl/factory/MQClientInstanceTest.java  | 29 ++++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index b5aaeb8..d40bdc2 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1214,6 +1214,9 @@ public class MQClientInstance {
 
     public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) 
{
         MQConsumerInner mqConsumerInner = 
this.consumerTable.get(consumerGroup);
+        if (mqConsumerInner == null) {
+            return null;
+        }
 
         ConsumerRunningInfo consumerRunningInfo = 
mqConsumerInner.consumerRunningInfo();
 
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index e0506aa..a3457e1 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.factory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.ClientConfig;
@@ -29,6 +30,8 @@ import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -41,6 +44,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
@@ -139,6 +143,31 @@ public class MQClientInstanceTest {
         assertThat(flag).isTrue();
     }
 
+
+    @Test
+    public void testConsumerRunningInfoWhenConsumersIsEmptyOrNot() throws 
RemotingException, InterruptedException, MQBrokerException {
+        MQConsumerInner mockConsumerInner = mock(MQConsumerInner.class);
+        ConsumerRunningInfo mockConsumerRunningInfo = 
mock(ConsumerRunningInfo.class);
+        
when(mockConsumerInner.consumerRunningInfo()).thenReturn(mockConsumerRunningInfo);
+        
when(mockConsumerInner.consumeType()).thenReturn(ConsumeType.CONSUME_PASSIVELY);
+        Properties properties = new Properties();
+        when(mockConsumerRunningInfo.getProperties()).thenReturn(properties);
+        mqClientInstance.unregisterConsumer(group);
+
+        ConsumerRunningInfo runningInfo = 
mqClientInstance.consumerRunningInfo(group);
+        assertThat(runningInfo).isNull();
+        boolean flag = mqClientInstance.registerConsumer(group, 
mockConsumerInner);
+        assertThat(flag).isTrue();
+
+        runningInfo = mqClientInstance.consumerRunningInfo(group);
+        assertThat(runningInfo).isNotNull();
+        
assertThat(mockConsumerInner.consumerRunningInfo().getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE));
+
+        mqClientInstance.unregisterConsumer(group);
+        flag = mqClientInstance.registerConsumer(group, 
mock(MQConsumerInner.class));
+        assertThat(flag).isTrue();
+    }
+
     @Test
     public void testRegisterAdminExt() {
         boolean flag = mqClientInstance.registerAdminExt(group, 
mock(MQAdminExtInner.class));

Reply via email to