Ah39 opened a new issue #576: Two consumer in one jvm , shutdown the two client 
may throw error in unregisterConsumer 
URL: https://github.com/apache/rocketmq/issues/576
 
 
   **The two client have the same MQClientInstance, if A client shutdown , may 
shutdown MQClientInstance and  netty client , so if B shutdown , call 
unregisterClient , MQClientAPIImpl have shutdowned before .**
   if  consumer.setInstanceName(); The two client have diffrent MQClientAPIImpl 
 so ,dont throw error 
   
![image](https://user-images.githubusercontent.com/2367243/49488130-5fb50000-f880-11e8-936c-ec593a12fe90.png)
   `public class TestClientShutdown {
   
        public static void main(String[] args) {
                TestClientShutdown testShutdown = new TestClientShutdown();
                try {
                        testShutdown.testShutdown();
                } catch (MQClientException e) {
                        e.printStackTrace();
                }
                System.exit(0);
        }
   
        // @Test
        public void testShutdown() throws MQClientException {
                System.out.println("start");
                DefaultMQPushConsumer consumer1 = createConsumer("hellogroup1");
                DefaultMQPushConsumer consumer2 = createConsumer("hellogroup2");
                Object obj = new Object();
                Thread thread = new Thread() {
                        public void run() {
                                synchronized (obj) {
                                        try {
                                                obj.wait();
                                        } catch (InterruptedException e) {
                                                e.printStackTrace();
                                        }
                                }
                                System.out.println("helloworld shutdown 
consumer1");
                                consumer1.shutdown();
                        }
                };
   
                Thread thread2 = new Thread() {
                        public void run() {
                                synchronized (obj) {
                                        try {
                                                obj.wait();
                                        } catch (InterruptedException e) {
                                                e.printStackTrace();
                                        }
                                }
                                System.out.println("helloworld shutdown 
consumer2");
                                consumer2.shutdown();
                        }
                };
                thread.start();
                thread2.start();
                try {
                        System.out.println("**************** start sleep 
**************************");
                        Thread.sleep(10000);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
   
                synchronized (obj) {
                        obj.notifyAll();
                }
                try {
                        Thread.sleep(20000);
                } catch (InterruptedException e) {
                        e.printStackTrace();
                }
                //consumer1.shutdown();
                //consumer2.shutdown(); //如果是一个线程顺序shutdown,则是没有问题,如果是两个线程,
                //分别shutdown consumer 
则会出现报错。如果是consumer.setInstanceName(consumerGroup);
                //则不会报错,因为内部有一个单例变量,setIntance 就是两个变量了
   
                System.out.println("start shutdown *******************");
        }
   
        public static DefaultMQPushConsumer createConsumer(String 
consumerGroup) throws MQClientException {
                final DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer(consumerGroup);
                
consumer.setNamesrvAddr("10.126.84.164:9876;10.126.84.165:9876");
   
                
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
   
                consumer.subscribe("topic_test", "*");
                //consumer.setInstanceName(consumerGroup);
                consumer.setMessageListener(new MessageListenerConcurrently() {
   
                        @Override
                        public ConsumeConcurrentlyStatus 
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                                Iterator<MessageExt> it = msgs.iterator();
                                while (it.hasNext()) {
                                        MessageExt msgExt = it.next();
                                        String str = new 
String(msgExt.getBody());
                                        
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " 
+ str);
                                }
                                return 
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   
                        }
   
                });
   
                consumer.start();
                return consumer;
        }
   
        public static DefaultMQProducer GetProducer() throws MQClientException {
                DefaultMQProducer producer = new 
DefaultMQProducer("helloproducer22");
                
producer.setNamesrvAddr("10.126.84.164:9876;10.126.84.165:9876");
                producer.setSendMsgTimeout(800);
                producer.start();
                return producer;
        }
   }
   `
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to