jinrongluo opened a new issue #2954:
URL: https://github.com/apache/rocketmq/issues/2954
I am playing around with message filter in RocketMQ, and I found the Tag
based and SQL-based filter not working when the consumers are in the same
Consumer Group. The following is the RocketMQ environment:
I run ProducerApp several times, with "tag1" or "tag2" in the messages. But
no message is received at both of the consumer Apps.
1. RocketMQ
Version 4.8.0, single instance
broker.conf
```
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
enablePropertyFilter=true
```
2. Consumer1 App
```
public class ConsumerApp {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumerGroup1");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("testtopic", "tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
System.out.printf("consumer %s, group %s Receive New
Messages: %s %n", "consumer1", "consumerGroup1", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
```
3. Consumer2 App
```
public class ConsumerApp2 {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("consumerGroup1");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("testtopic", "tag2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
System.out.printf("consumer %s, group %s Receive New
Messages: %s %n", "consumer2", "consumerGroup1", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
```
4. Producer App
```
public class ProducerApp {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("10.213.96.192:9876");
producer.start();
Message msg = new Message("testtopic", "tag1",
"helloworld".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg);
producer.shutdown();
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]