jinrongluo opened a new issue #2954:
URL: https://github.com/apache/rocketmq/issues/2954


   I am playing around with message filter in RocketMQ, and I found the Tag 
based and SQL-based filter not working when the consumers are in the same 
Consumer Group. The following is the RocketMQ environment:
   
   I run ProducerApp several times, with "tag1" or  "tag2" in the messages. But 
no message is received at both of the consumer Apps.
   
   1. RocketMQ 
   Version 4.8.0, single instance
   
   broker.conf
   ```
   brokerClusterName = DefaultCluster
   brokerName = broker-a
   brokerId = 0
   deleteWhen = 04
   fileReservedTime = 48
   brokerRole = ASYNC_MASTER
   flushDiskType = ASYNC_FLUSH
   enablePropertyFilter=true
   ```
   
   2.  Consumer1 App
   
   ```
   public class ConsumerApp {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("consumerGroup1");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           consumer.subscribe("testtopic", "tag1");
           consumer.registerMessageListener(new MessageListenerConcurrently() {
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                   System.out.printf("consumer %s, group %s Receive New 
Messages: %s %n", "consumer1", "consumerGroup1", msgs);
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           consumer.start();
       }
   }
   
   ```
   
   3. Consumer2 App
   
   ```
   public class ConsumerApp2 {
       public static void main(String[] args) throws Exception {
           DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("consumerGroup1");
           consumer.setNamesrvAddr("127.0.0.1:9876");
           consumer.subscribe("testtopic", "tag2");
           consumer.registerMessageListener(new MessageListenerConcurrently() {
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                   System.out.printf("consumer %s, group %s Receive New 
Messages: %s %n", "consumer2", "consumerGroup1", msgs);
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           consumer.start();
       }
   }
   ```
   
   4. Producer App
   
   ```
   public class ProducerApp {
       public static void main(String[] args) throws Exception {
           DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
           producer.setNamesrvAddr("10.213.96.192:9876");
           producer.start();
           Message msg = new Message("testtopic", "tag1", 
"helloworld".getBytes(RemotingHelper.DEFAULT_CHARSET));
           producer.send(msg);
           producer.shutdown();
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to