Repository: incubator-rocketmq-site Updated Branches: refs/heads/asf-site 94c21bba6 -> c6a0f07ed
Update example of scheduled message Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/3bb95024 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/3bb95024 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/3bb95024 Branch: refs/heads/asf-site Commit: 3bb950241e1e2d07aed7f7ed788d968a3dcece97 Parents: a84f68d Author: Li Zhanhui <[email protected]> Authored: Wed Jun 7 11:57:11 2017 +0800 Committer: Li Zhanhui <[email protected]> Committed: Wed Jun 7 11:57:11 2017 +0800 ---------------------------------------------------------------------- _docs/17-rmq-schedule-example.md | 105 +++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/3bb95024/_docs/17-rmq-schedule-example.md ---------------------------------------------------------------------- diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md index e9610ee..0340f21 100644 --- a/_docs/17-rmq-schedule-example.md +++ b/_docs/17-rmq-schedule-example.md @@ -11,61 +11,72 @@ modified: 2017-04-24T15:01:43-04:00 ### What is scheduled message? -Scheduled messages differ from normal messages such that they won't be delivered until a provided time later. -If you use `DefaultMQPullConsumer` to consume message, you have to fetch message manually. There are other options availible but `MQPullConsumerScheduleService` is the easiest. -#### DefaultMQPullConsumer use case +Scheduled messages differ from normal messages in that they won't be delivered until a provided time later. -> First fetch subscribed queues of a topic +### Application + +1. Start consumer to wait for incoming subscribed messages ```java -Set<MessageQueue> testTopic = consumer.fetchSubscribeMessageQueues("testTopic"); +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import java.util.List; + +public class ScheduledMessageConsumer { + + public static void main(String[] args) throws Exception { + // Instantiate message consumer + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer"); + // Subscribe topics + consumer.subscribe("TestTopic", "*"); + // Register message listener + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { + for (MessageExt message : messages) { + // Print approximate delay time period + System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + // Launch consumer + consumer.start(); + } +} ``` -> Second chose a queue to fetch message,and save queue offset manually. -#### Use MQPullConsumerScheduleService consume message +2. Send scheduled messages ```java -final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); - -scheduleService.setMessageModel(MessageModel.CLUSTERING); -scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { - - @Override - public void doPullTask(MessageQueue mq, PullTaskContext context) { - MQPullConsumer consumer = context.getPullConsumer(); - try { - - long offset = consumer.fetchConsumeOffset(mq, false); - if (offset < 0) - offset = 0; - - PullResult pullResult = consumer.pull(mq, "*", offset, 32); - System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); - switch (pullResult.getPullStatus()) { - case FOUND: - break; - case NO_MATCHED_MSG: - break; - case NO_NEW_MSG: - case OFFSET_ILLEGAL: - break; - default: - break; - } - consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); - - //consume message auto - context.setPullNextDelayTimeMillis(100); - } catch (Exception e) { - e.printStackTrace(); +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; + +public class ScheduledMessageProducer { + + public static void main(String[] args) throws Exception { + // Instantiate a producer to send scheduled messages + DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup"); + // Launch producer + producer.start(); + int totalMessagesToSend = 100; + for (int i = 0; i < totalMessagesToSend; i++) { + Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); + // This message will be delivered to consumer 10 seconds later. + message.setDelayTimeLevel(3); + // Send the message + producer.send(message); } - } -}); - -scheduleService.start(); -``` - -#### Have fun with `MQPullConsumerScheduleService`. + // Shutdown producer after use. + producer.shutdown(); + } + +} +``` \ No newline at end of file
