---------------------------------producer
---------------------------------------------
@Test
public void sendTransactionMQ() throws MQClientException,
UnsupportedEncodingException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new
TransactionMQProducer(TRANSACTION_MQ_PRODUCER_GROUP_NAME);
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
Message msg = new Message("testMq", "TagA", "KEYi", ("Hello RocketMQ
").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
---------------------------------producer
---------------------------------------------
------------------------------------------consumer--------------------------------------------
public static void main(String[] args) throws InterruptedException,
MQClientException {
//设置消费者组名
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("transaction_mq_consumer");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//指定nameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//指定订阅的topic及tag表达式
consumer.subscribe("testMq", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(String.format("Custome2 message
[%s],tagName[%s]",
new String(msg.getBody()),
msg.getTags()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者实例
consumer.start();
System.out.println("Consumer Started.");
}
------------------------------------------consumer--------------------------------------------
[ Full content available at: https://github.com/apache/rocketmq/issues/475 ]
This message was relayed via gitbox.apache.org for [email protected]