package org.apache.rocketmq.example.filter;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SqlProducer {
public static void main(String[] args) {
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
return;
}
for (int i = 0; i < 10; i++) {
try {
String tag;
int div = i % 3;
if (div == 0) {
tag = "TagA";
} else if (div == 1) {
tag = "TagB";
} else {
tag = "TagC";
}
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
producer.shutdown();
}
}
package org.apache.rocketmq.example.filter;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class SqlConsumer {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
try {
consumer.subscribe("TopicTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA',
'TagB'))" +
"and (a is not null and a between 0 3)"));
} catch (MQClientException e) {
e.printStackTrace();
return;
}
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
return;
}
System.out.printf("Consumer Started.%n");
}
}
when I start the consumer to consume, the following exception will occur.
15:48:17.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the
default logging framework
org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The
broker does not support consumer to filter message by SQL92
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at
org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2089)
at
org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:432)
at
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:633)
at
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:520)
at
org.apache.rocketmq.example.filter.SqlConsumer.main(SqlConsumer.java:56)
a
[ Full content available at: https://github.com/apache/rocketmq/issues/455 ]
This message was relayed via gitbox.apache.org for [email protected]