Lixianshengchao opened a new issue #619:
URL: https://github.com/apache/rocketmq-externals/issues/619


   
我的rocketmq配置是双主双从,异步模式的。我发现一个问题,就是我使用广播模式消费消息,消息有被消费者消费,但是rocketmq-console控制台依旧显示消息没被消费掉(消息堆积)。
   
   我的生产者代码如下
   `public static void main(String[] args) throws Exception {
           //1.创建消息生产者producer,并制定生产者组名
           DefaultMQProducer producer = new DefaultMQProducer("group1");
           //2.指定Nameserver地址
           producer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
           //3.启动producer
           producer.start();
   
           for (int i = 0; i < 10; i++) {
               //4.创建消息对象,指定主题Topic、Tag和消息体
               /**
                * 参数一:消息主题Topic
                * 参数二:消息Tag
                * 参数三:消息内容
                */
               Message msg = new Message("springboot-mq", "Tag1", ("Hello 
World" + i).getBytes());
               //5.发送消息
               SendResult result = producer.send(msg);
               //发送状态
               SendStatus status = result.getSendStatus();
   
               System.out.println("发送结果:" + result);
   
               //线程睡1秒
               TimeUnit.SECONDS.sleep(1);
           }
   
           //6.关闭生产者producer
          // producer.shutdown();
       }`
   消费者代码如下:
   `    public static void main(String[] args) throws Exception {
           //1.创建消费者Consumer,制定消费者组名
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
           //2.指定Nameserver地址
           consumer.setNamesrvAddr("x.x.x.x:9876;x.x.x.x:9876");
           //3.订阅主题Topic和Tag
           consumer.subscribe("springboot-mq", "*");
   
           //设定消费模式:负载均衡|广播模式
           consumer.setMessageModel(MessageModel.BROADCASTING);
           
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
   
           //4.设置回调函数,处理消息
           consumer.registerMessageListener(new MessageListenerConcurrently() {
   
               //接受消息内容cc
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                   for (MessageExt msg : msgs) {
                       System.out.println("consumeThread=" + 
Thread.currentThread().getName() + "," + new String(msg.getBody()));
                   }
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           //5.启动消费者consumer
           consumer.start();
       }`
   
生产者把消息发送到rocketmq,消费者都可以消费到消息。但是消费者消费完消息后,rocketmq-console控制台依旧没有显示消息被消费掉(消息堆积)。如果把消费者采用集群模式消费,消息被消费后,rocektmq-console正常,没有消息堆积。
   
   
具体问题详情可以看[https://segmentfault.com/q/1010000023506758](https://segmentfault.com/q/1010000023506758)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to