package com.weimai.rocket.filter;

import java.io.IOException;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class Producer {
    
    public static void main(String[] args) throws IOException {
        try {
            DefaultMQProducer producer = new 
DefaultMQProducer("sequence_producer");
 
            producer.setNamesrvAddr("127.0.0.1:9876");
 
            producer.start();
            // 订单列表
            for (int i = 0; i < 20; i++) {
                // 加个时间后缀

                Message msg = new Message("SequenceTopicTest", // topic
                                          "TagA", // tag
                                          ("Hello RocketMQ " + 
i).getBytes("utf-8")// body
                );
                msg.putUserProperty("SequenceId", String.valueOf(i));// 设置过了参数
                // 调用producer的send()方法发送消息
                // 这里调用的是同步的方式,所以会有返回结果
                SendResult sendResult = producer.send(msg);
                System.out.println(i + "" + sendResult.getSendStatus());
            }

            producer.shutdown();

        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}



package com.weimai.rocket.filter;

import java.io.IOException;
import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {

    public static void main(String[] args) throws MQClientException, 
IOException {
        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);


        // String filterCode =
        // 
MixAll.file2String("E:\\workspace2\\rocketmq\\src\\main\\java\\com\\gwd\\rocketmq\\MyMessageFilter.java");
        String filterCode = 
MixAll.file2String("C:\\a_zf_secret\\MyMessageFilterImpl.java");

        consumer.subscribe("SequenceTopicTest", 
"com.weimai.rocket.filter.MyMessageFilterImpl", filterCode);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
                System.out.print(Thread.currentThread().getName() + " Receive 
New Messages: ");
                for (MessageExt msg : msgs) {
                    System.out.println("content:" + new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

}



package com.weimai.rocket.filter;

import org.apache.rocketmq.common.filter.FilterContext;
import org.apache.rocketmq.common.filter.MessageFilter;
import org.apache.rocketmq.common.message.MessageExt;

public class MyMessageFilterImpl implements MessageFilter {

    @Override
    public boolean match(MessageExt msg, FilterContext context) {
        String property = msg.getProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if (((id % 2) == 0)) {
                return true;
            }
        }

        return false;
    }
}


[ Full content available at: https://github.com/apache/rocketmq/issues/452 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to