fenghaichun opened a new issue #1261: 一个消费者组下的每个消费者为什么会消费到所有队列? URL: https://github.com/apache/rocketmq/issues/1261 部署环境:单机docker 部署方式:2个nameServer、2个Master、2个slave 操作步骤: 1. 创建一个生产者,队列数量采用默认的4个,代码如下: `public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("group_test"); // Specify name server addresses. producer.setNamesrvAddr("192.168.1.6:9876:192.168.1.6:9877"); //Launch the instance. producer.start(); int i = 0; while (true) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTestL" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); Thread.sleep(1000); } //Shut down once the producer instance is not longer in use. //producer.shutdown(); }` 2. 创建两个消费者,代码如下: `private static void consume(String instanceName) throws MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test"); // Specify name server addresses. consumer.setNamesrvAddr("192.168.1.6:9876:192.168.1.6:9877"); consumer.setInstanceName(instanceName); // Subscribe one more more topics to consume. consumer.subscribe("TopicTestL", "*"); consumer.setConsumerGroup("group_test"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt m: msgs) { System.out.println("consume:" + instanceName + ",QueueId:" + m.getQueueId()); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); }` 两个消费者分别从不同的进程或者线程启动,看到打印的消费记录如下: `consume:consume1,QueueId:0 consume:consume1,QueueId:1 consume:consume1,QueueId:2 consume:consume1,QueueId:3 consume:consume2,QueueId:0 consume:consume2,QueueId:1 consume:consume2,QueueId:2 consume:consume2,QueueId:3` 问题:为什么每个消费者会消费到每个队列,或者说为什么一个消息队列会被多个消费者消费。(注意:消息本身是没有被重复消费),我期望的是每个消费者只消费一部分队列,每个消费队列一段时间只被一个消费者消费,是否我操作有误,还是设计如此?求解答,谢谢。
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
