Ah39 opened a new issue #576: Two consumer in one jvm , shutdown the two client may throw error in unregisterConsumer URL: https://github.com/apache/rocketmq/issues/576 **The two client have the same MQClientInstance, if A client shutdown , may shutdown MQClientInstance and netty client , so if B shutdown , call unregisterClient , MQClientAPIImpl have shutdowned before .** if consumer.setInstanceName(); The two client have diffrent MQClientAPIImpl so ,dont throw error  `public class TestClientShutdown { public static void main(String[] args) { TestClientShutdown testShutdown = new TestClientShutdown(); try { testShutdown.testShutdown(); } catch (MQClientException e) { e.printStackTrace(); } System.exit(0); } // @Test public void testShutdown() throws MQClientException { System.out.println("start"); DefaultMQPushConsumer consumer1 = createConsumer("hellogroup1"); DefaultMQPushConsumer consumer2 = createConsumer("hellogroup2"); Object obj = new Object(); Thread thread = new Thread() { public void run() { synchronized (obj) { try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("helloworld shutdown consumer1"); consumer1.shutdown(); } }; Thread thread2 = new Thread() { public void run() { synchronized (obj) { try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("helloworld shutdown consumer2"); consumer2.shutdown(); } }; thread.start(); thread2.start(); try { System.out.println("**************** start sleep **************************"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (obj) { obj.notifyAll(); } try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } //consumer1.shutdown(); //consumer2.shutdown(); //如果是一个线程顺序shutdown,则是没有问题,如果是两个线程, //分别shutdown consumer 则会出现报错。如果是consumer.setInstanceName(consumerGroup); //则不会报错,因为内部有一个单例变量,setIntance 就是两个变量了 System.out.println("start shutdown *******************"); } public static DefaultMQPushConsumer createConsumer(String consumerGroup) throws MQClientException { final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr("10.126.84.164:9876;10.126.84.165:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("topic_test", "*"); //consumer.setInstanceName(consumerGroup); consumer.setMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { Iterator<MessageExt> it = msgs.iterator(); while (it.hasNext()) { MessageExt msgExt = it.next(); String str = new String(msgExt.getBody()); System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + str); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); return consumer; } public static DefaultMQProducer GetProducer() throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("helloproducer22"); producer.setNamesrvAddr("10.126.84.164:9876;10.126.84.165:9876"); producer.setSendMsgTimeout(800); producer.start(); return producer; } } `
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
