package org.apache.rocketmq.example.filter;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SqlProducer {

    public static void main(String[] args) {
        DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");

        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return;
        }

        for (int i = 0; i < 10; i++) {
            try {
                String tag;
                int div = i % 3;
                if (div == 0) {
                    tag = "TagA";
                } else if (div == 1) {
                    tag = "TagB";
                } else {
                    tag = "TagC";
                }
                Message msg = new Message("TopicTest",
                    tag,
                    ("Hello RocketMQ " + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                msg.putUserProperty("a", String.valueOf(i));

                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        producer.shutdown();
    }
}



package org.apache.rocketmq.example.filter;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class SqlConsumer {

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        try {
            consumer.subscribe("TopicTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 
'TagB'))" +
                    "and (a is not null and a between 0  3)"));
        } catch (MQClientException e) {
            e.printStackTrace();
            return;
        }

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", 
Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return;
        }
        System.out.printf("Consumer Started.%n");
    }
}


 when I start the consumer to consume, the following exception will occur.

15:48:17.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the 
default logging framework
org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The 
broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
        at 
org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2089)
        at 
org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:432)
        at 
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:633)
        at 
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:520)
        at 
org.apache.rocketmq.example.filter.SqlConsumer.main(SqlConsumer.java:56)
a



[ Full content available at: https://github.com/apache/rocketmq/issues/455 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to