package com.weimai.rocket.filter;
import java.io.IOException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws IOException {
try {
DefaultMQProducer producer = new
DefaultMQProducer("sequence_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 订单列表
for (int i = 0; i < 20; i++) {
// 加个时间后缀
Message msg = new Message("SequenceTopicTest", // topic
"TagA", // tag
("Hello RocketMQ " +
i).getBytes("utf-8")// body
);
msg.putUserProperty("SequenceId", String.valueOf(i));// 设置过了参数
// 调用producer的send()方法发送消息
// 这里调用的是同步的方式,所以会有返回结果
SendResult sendResult = producer.send(msg);
System.out.println(i + "" + sendResult.getSendStatus());
}
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.weimai.rocket.filter;
import java.io.IOException;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws MQClientException,
IOException {
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// String filterCode =
//
MixAll.file2String("E:\\workspace2\\rocketmq\\src\\main\\java\\com\\gwd\\rocketmq\\MyMessageFilter.java");
String filterCode =
MixAll.file2String("C:\\a_zf_secret\\MyMessageFilterImpl.java");
consumer.subscribe("SequenceTopicTest",
"com.weimai.rocket.filter.MyMessageFilterImpl", filterCode);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs, ConsumeConcurrentlyContext context) {
System.out.print(Thread.currentThread().getName() + " Receive
New Messages: ");
for (MessageExt msg : msgs) {
System.out.println("content:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
package com.weimai.rocket.filter;
import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;
public class MyMessageFilterImpl implements MessageFilter {
@Override
public boolean match(MessageExt msg, FilterContext context) {
String property = msg.getProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
if (((id % 2) == 0)) {
return true;
}
}
return false;
}
}
[ Full content available at: https://github.com/apache/rocketmq/issues/452 ]
This message was relayed via gitbox.apache.org for [email protected]