mm23504570 opened a new issue #2378:
URL: https://github.com/apache/rocketmq/issues/2378
**BUG REPORT**
1. Please describe the issue you observed:
`NullPointerException` when Consumer shutdown and Console invoke
consumerRunningInfo.
2. Please tell us about your environment:
Rocketmq version is 4.7.1 and 4.5.2
3. Other information (e.g. detailed explanation, logs, related issues,
suggestions how to fix, etc):
logs:
```
2020-10-28 12:04:05,005 ERROR RocketmqRemoting - process request exception
java.lang.NullPointerException
at
org.apache.rocketmq.client.impl.factory.MQClientInstance.consumerRunningInfo(MQClientInstance.java:1213)
at
org.apache.rocketmq.client.impl.ClientRemotingProcessor.getConsumerRunningInfo(ClientRemotingProcessor.java:185)
at
org.apache.rocketmq.client.impl.ClientRemotingProcessor.processRequest(ClientRemotingProcessor.java:81)
at
org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor.asyncProcessRequest(AsyncNettyRequestProcessor.java:26)
at
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1.run(NettyRemotingAbstract.java:225)
at
org.apache.rocketmq.remoting.netty.RequestTask.run(RequestTask.java:80)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
demo
``` java
public class Consumer {
public static void main(String[] args) throws InterruptedException,
MQClientException, RemotingException {
new Thread(new Runnable() {
@Override
public void run() {
while (true){
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_4");
String namesrvAddr = "10.2.0.6:9876";
consumer.setNamesrvAddr(namesrvAddr);
consumer.setInstanceName("v2");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
consumer.subscribe("TopicTest", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.registerMessageListener(new
MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s
%n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.shutdown();
}
}
}).start();
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExt.setNamesrvAddr("10.2.0.6:9876");
defaultMQAdminExt.setInstanceName("another");
defaultMQAdminExt.start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
ConsumerRunningInfo consumerRunningInfo =
defaultMQAdminExt.getConsumerRunningInfo("please_rename_unique_group_name_4",
"10.5.129.69@v2", true);
} catch (RemotingException e) {
} catch (MQClientException e) {
} catch (InterruptedException e) {
}
}
}
}).start();
Thread.sleep(100000);
}
}
```
suggestion:
``` java
public class ClientRemotingProcessor{
public ConsumerRunningInfo consumerRunningInfo(final String
consumerGroup) {
MQConsumerInner mqConsumerInner =
this.consumerTable.get(consumerGroup);
// validate
if(mqConsumerInner == null) return null;
ConsumerRunningInfo consumerRunningInfo =
mqConsumerInner.consumerRunningInfo();
.....
}
.....
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]