---------------------------------producer 
---------------------------------------------
@Test
    public void sendTransactionMQ() throws MQClientException, 
UnsupportedEncodingException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new 
TransactionMQProducer(TRANSACTION_MQ_PRODUCER_GROUP_NAME);
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, 
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
        Message msg = new Message("testMq", "TagA", "KEYi", ("Hello RocketMQ 
").getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    }
---------------------------------producer 
---------------------------------------------

------------------------------------------consumer--------------------------------------------
public static void main(String[] args) throws InterruptedException, 
MQClientException {


        //设置消费者组名
        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("transaction_mq_consumer");
        //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //指定nameServer的地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //指定订阅的topic及tag表达式
        consumer.subscribe("testMq", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
                                                            
ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(String.format("Custome2 message 
[%s],tagName[%s]",
                            new String(msg.getBody()),
                            msg.getTags()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者实例
        consumer.start();
        System.out.println("Consumer Started.");
    }
------------------------------------------consumer--------------------------------------------


[ Full content available at: https://github.com/apache/rocketmq/issues/475 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to