Repository: incubator-rocketmq-site Updated Branches: refs/heads/master 0446b8b89 -> beb14e500
Add batch broadcast arc and schedule example documents, closes apache/incubator-rocketmq-site#10 Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/commit/beb14e50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/tree/beb14e50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/diff/beb14e50 Branch: refs/heads/master Commit: beb14e500eab7a84ed90601a867b624cac470c07 Parents: 0446b8b Author: lindzh <[email protected]> Authored: Tue Apr 25 00:59:49 2017 +0800 Committer: yukon <[email protected]> Committed: Tue Apr 25 00:59:49 2017 +0800 ---------------------------------------------------------------------- _data/navigation.yml | 10 +- _docs/12-rmq-batch-example.md | 101 ++++++++++++++++ _docs/13-rmq-broadcasting-example.md | 66 ++++++++++ _docs/14-rmq-deployment.md | 145 ++++++++++++++++++++++ _docs/15-order-message-example .md | 86 +++++++++++++ _docs/16-rmq-architecture.md | 68 +++++++++++ _docs/17-rmq-schedule-example.md | 70 +++++++++++ _docs/18-simple-example.md | 187 +++++++++++++++++++++++++++++ _sass/_navigation.scss | 4 +- assets/images/rmq-basic-arc.png | Bin 0 -> 69606 bytes assets/images/rmq-basic-component.png | Bin 0 -> 69730 bytes 11 files changed, 731 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_data/navigation.yml ---------------------------------------------------------------------- diff --git a/_data/navigation.yml b/_data/navigation.yml index 4b86674..b239dd5 100644 --- a/_data/navigation.yml +++ b/_data/navigation.yml @@ -27,17 +27,19 @@ docs: url: /docs/simple-example/ - title: "Order Example" url: /docs/order-example/ + - title: "Broadcasting Example" + url: /docs/broadcast-example/ - title: "Schedule Example" url: /docs/schedule-example/ - - title: "Schedule Example" - url: /docs/schedule-example/ + - title: "Batch Example" + url: /docs/batch-example/ - title: Deployment & Operations children: - title: "Architecture" - url: /docs/architecture/ + url: /docs/rmq-arc/ - title: "Deployment" - url: /docs/deployment/ + url: /docs/rmq-deployment/ - title: "CLI Admin Tool" url: /docs/cli-admin-tool/ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/12-rmq-batch-example.md ---------------------------------------------------------------------- diff --git a/_docs/12-rmq-batch-example.md b/_docs/12-rmq-batch-example.md new file mode 100644 index 0000000..01b5d32 --- /dev/null +++ b/_docs/12-rmq-batch-example.md @@ -0,0 +1,101 @@ +--- +title: "Batch Example" +permalink: /docs/batch-example/ +excerpt: "How to use batch in Rocketmq" +modified: 2017-04-24T15:01:43-04:00 +--- + +{% include toc %} + +#### When to use batch +Batch is not for packaging but improving performance of small messages. So the messages of the same batch should act the same role, no more effort should be taken to split the batch. +No split has another important advantage, messages of the same batch should be sent atomically, that is all successfully or all unsuccessfully, of which the importance is self-evident. +So performance and atomicity are the original intentions, which will reflect on the usage constraints. +That is to say, if you want to improve performance for small messages or to send messages atomically, batch is a nice solution for you. +#### Usage constraints +Performance and atomicity are worth the following constraints: +messages of the same batch should have: + +1. same topic: If they belong to different topics(internally the queues), then may be sent to different brokers, which will against atomicity. +2. same waitStoreMsgOK: also differences will against atomicity. +3. no delay level: If we care about the delay level, we need to decode the internal properties of every message, which will cause much performance loss. + +And the most important, the total size, that is the sum of size of each message in one batch, should be no more than 1M. + +#### How to use batch +If you just send several small messages in a time and do not need to worry about the size limit, it is easy to use batch: + +```java +String topic = "BatchTest"; +List<Message> messages = new ArrayList<>(); +messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes())); +messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes())); +messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes())); +try { + producer.send(messages); +} catch (Exception e) { + e.printStackTrace(); + //handle the error +} + +``` +#### Split into lists +The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (1M). + +At this time, you'd better split the lists: + +```java +public class ListSplitter implements Iterator<List<Message>> { + private final int SIZE_LIMIT = 1000 * 1000; + private final List<Message> messages; + private int currIndex; + public ListSplitter(List<Message> messages) { + this.messages = messages; + } + @Override public boolean hasNext() { + return currIndex < messages.size(); + } + @Override public List<Message> next() { + int nextIndex = currIndex; + int totalSize = 0; + for (; nextIndex < messages.size(); nextIndex++) { + Message message = messages.get(nextIndex); + int tmpSize = message.getTopic().length() + message.getBody().length; + Map<String, String> properties = message.getProperties(); + for (Map.Entry<String, String> entry : properties.entrySet()) { + tmpSize += entry.getKey().length() + entry.getValue().length(); + } + tmpSize = tmpSize + 20; //for log overhead + if (tmpSize > SIZE_LIMIT) { + //it is unexpected that single message exceeds the SIZE_LIMIT + //here just let it go, otherwise it will block the splitting process + if (nextIndex - currIndex == 0) { + //if the next sublist has no element, add this one and then break, otherwise just break + nextIndex++; + } + break; + } + if (tmpSize + totalSize > SIZE_LIMIT) { + break; + } else { + totalSize += tmpSize; + } + + } + List<Message> subList = messages.subList(currIndex, nextIndex); + currIndex = nextIndex; + return subList; + } +} +//then you could split the large list into small ones: +ListSplitter splitter = new ListSplitter(messages); +while (splitter.hasNext()) { + try { + List<Message> listItem = splitter.next(); + producer.send(listItem); + } catch (Exception e) { + e.printStackTrace(); + //handle the error + } +} +``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/13-rmq-broadcasting-example.md ---------------------------------------------------------------------- diff --git a/_docs/13-rmq-broadcasting-example.md b/_docs/13-rmq-broadcasting-example.md new file mode 100644 index 0000000..63a10fb --- /dev/null +++ b/_docs/13-rmq-broadcasting-example.md @@ -0,0 +1,66 @@ +--- +title: "Broadcasting" +permalink: /docs/broadcast-example/ +excerpt: "How to send broadcast messages in Apache RocketMQ." +modified: 2017-04-24T15:01:43-04:00 +--- + +{% include toc %} + +#### What is Broadcasting +Broadcasting is when sending a meeage to a topic,all subscribers of the topic will receive the message even if they are in the same consumer group.If you want all subscribers in a group receive all the messages in a topic,broadcasting is a good choice. + +#### How to use + +##### First,Produce message as before + +```java +DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + +producer.start(); + +for (int i = 0; i < 10000000; i++){ + try { + { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + + } catch (Exception e) { + e.printStackTrace(); + } +} +producer.shutdown(); +``` + +##### Second,Consume message in Broadcast mode + +```java +DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); + +consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + +//set to broadcast mode +consumer.setMessageModel(MessageModel.BROADCASTING); + +consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + +consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeConcurrentlyContext context) { + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } +}); + +consumer.start(); +System.out.printf("Broadcast Consumer Started.%n"); +``` + +Enjoy it. http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/14-rmq-deployment.md ---------------------------------------------------------------------- diff --git a/_docs/14-rmq-deployment.md b/_docs/14-rmq-deployment.md new file mode 100644 index 0000000..0f4af22 --- /dev/null +++ b/_docs/14-rmq-deployment.md @@ -0,0 +1,145 @@ +--- +title: "Deployment" +permalink: /docs/rmq-deployment/ +excerpt: "How to deploy the Apache RocketMQ." +modified: 2017-04-24T15:01:43-04:00 +--- + +{% include toc %} + +This section is to introduce deployment solution which is considered production-ready. Generally speaking, we are deploying a resilient RocketMQ cluster having no single point of failure. + +### Prerequisite +Before starting this section, make sure you have read Quick Start section, in which core concepts and components of RocketMQ are introduced. + +#### Production-ready Deployment +#####Name Server +To ensure the cluster can still operate normally when one instance crashes, two or more name server instances are recommended. As long as there is at least one name server instance alive, the whole cluster remains serving. + +Name server follows share-nothing design paradigm. Brokers send heartbeat data to all name servers. Producers and consumers may query meta data from any one of name servers available while sending / consuming messages. + +#### Broker +Brokers can be divided into two categories according to their roles: master and slave. Master brokers provide RW access while slave brokers only accept read access. + +To deploy a high-availability RocketMQ cluster which has no single point of failure, a series of broker sets should be deployed. One broker set contains one master with brokerId set to 0 and several slaves with non-zero brokerIDs. All of the brokers in one set have the same brokerName. In serious scenarios, we should have at least two brokers in one broker set. Each topic resides in two or more brokers. + +### Configuration +When deploying a RocketMQ cluster, below configurations should be taken into consideration. + +##### Broker configuration + +| Property Name | Default values | Details | +| ----------------- |:------------------:| ---------------:| +| listenPort | 10911 | listen port for client | +| namesrvAddr | null | name server address | +| brokerIP1 | InetAddress for network interface | Should be configured if having multiple addresses | +| brokerName | null | broker name | +| brokerClusterName | DefaultCluster |this broker belongs to which cluster | +| brokerId | 0 |broker id, 0 means master, positive integers mean slave | +| storePathCommitLog | $HOME/store/commitlog/ |file path for commit log | +| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue | +| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log | +| deleteWhen | 04 |When to delete the commitlog which is out of the reserve time | +| fileReserverdTime | 72 |The number of hours to keep a commitlog before deleting it | +| brokerRole | ASYNC_MASTER |SYNC_MASTER/ASYNC_MASTER/SLVAE | +| flushDiskType | ASYNC_FLUSH |{SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance.| + +### CLI Admin Tool +RocketMQ provides a CLI(command-line interface) admin tool belt to query, manage and diagnose various issues. + +#### How To Get +The admin tool is shipped along with RocketMQ. Either you download a pre-built binary version or build from source by yourself, you already have it. + +In case you have source code, the rocketmq-tools module contains its source code. + +#### How to use +The Admin Tool is very easy to use. Here, for demonstration purpose, *nix environment is assumed. + +Change directory to ${PACKAGE}/bin, command bash mqadmin, you should see the following help menu pops out: + +```java +The most commonly used mqadmin commands are: + updateTopic Update or create topic + deleteTopic Delete topic from broker and NameServer + updateSubGroup Update or create subscription group + deleteSubGroup Delete subscription group from broker + updateBrokerConfig Update broker's config + updateTopicPerm Update topic perm + topicRoute Examine topic route info + topicStatus Examine topic Status info + topicClusterList get cluster info for topic + brokerStatus Fetch broker runtime status data + queryMsgById Query Message by Id + queryMsgByKey Query Message by Key + queryMsgByUniqueKey Query Message by Unique key + queryMsgByOffset Query Message by offset + queryMsgByUniqueKey Query Message by Unique key + printMsg Print Message Detail + sendMsgStatus send msg to broker + brokerConsumeStats Fetch broker consume stats data + producerConnection Query producer's socket connection and client version + consumerConnection Query consumer's socket connection, client version and subscription + consumerProgress Query consumers's progress, speed + consumerStatus Query consumer's internal data structure + cloneGroupOffset clone offset from other group + clusterList List all of clusters + topicList Fetch all topic list from name server + updateKvConfig Create or update KV config + deleteKvConfig Delete KV config + wipeWritePerm Wipe write perm of broker in all name server + resetOffsetByTime Reset consumer offset by timestamp(without client restart) + updateOrderConf Create or update or delete order conf + cleanExpiredCQ Clean expired ConsumeQueue on broker. + cleanUnusedTopic Clean unused topic on broker + startMonitoring Start Monitoring + statsAll Topic and Consumer tps stats + syncDocs Synchronize wiki and issue to github.com + allocateMQ Allocate MQ + checkMsgSendRT check message send response time + clusterRT List All clusters Message Send RT + +``` +See 'mqadmin help <command>' for more information on a specific command. +If you want to get more information about a specific command like 'clusterList', just type bash mqadmin help clusterList and you will see: + +```java +usage: mqadmin clusterList [-h] [-i <arg>] [-m] [-n <arg>] + -h,--help Print help + -i,--interval <arg> specify intervals numbers, it is in seconds + -m,--moreStats Print more stats + -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 +``` + +### Replication mode +With the purpose to guarantee that any successfully published message will not be lost, RocketMQ provides a Replication mode to gain stronger durability and higher availability with two replication way: Sync & Async. + +#####Replication: Sync / Async Broker + +Similar to many replication system, sync brokers await until commit log is replicated to the slave before acknowledging. Async brokers, instead, return immediately after messages are processed on master. + +##### How to configure +There are three pre-built configurations shipped with the distribution of RocketMQ under conf folder for your reference: + +```java +2m-2s-sync +2m-2s-async +2m-noslave +``` +Note: all configurations uses ASYNC_FLUSH. + +#### Deployment +Take the deployment of 2m-2s-sync as example. First, start up two name servers as is shown in the Quick Start section. Assume their IPs are 192.168.0.2 and 192.168.0.3. + +Then start the brokers(Assume binary RocketMQ is at /home/rocketmq/dist) + +```java +>cd /home/rocketmq/dist/bin +>bash mqbroker -c ../conf/2m-2s-sync/broker-a.properties -n 192.168.0.2:9876,192.168.0.3:9876 +>bash mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties -n 192.168.0.2:9876,192.168.0.3:9876 +>bash mqbroker -c ../conf/2m-2s-sync/broker-b.properties -n 192.168.0.2:9876,192.168.0.3:9876 +>bash mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties -n 192.168.0.2:9876,192.168.0.3:9876 +How to verify +Execute the following command to verify according to the CLI section: +> bash mqadmin clusterlist +``` + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/15-order-message-example .md ---------------------------------------------------------------------- diff --git a/_docs/15-order-message-example .md b/_docs/15-order-message-example .md new file mode 100644 index 0000000..0375322 --- /dev/null +++ b/_docs/15-order-message-example .md @@ -0,0 +1,86 @@ +--- +title: "Order Message " +permalink: /docs/order-example/ +excerpt: "How to send and receive ordered messages in Apache RocketMQ." +modified: 2017-04-24T15:01:43-04:00 +--- + + +{% include toc %} + + +To send and subscribe to order messages, use Java MQ SDK 1.2.7 and above. The sequential message is a kind of message type which is provided by MQ according to the order, which is suitable for the news release and consumption according to the principle of fifo. + +For more information, please refer to the sequential message file. +The global order message and the partition order message send and receive the way to be the same basically, please refer to the following example code specifically. + +#### Send message sample code + +```java +try { + MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); + producer.start(); + + String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; + for (int i = 0; i < 100; i++) { + int orderId = i % 10; + Message msg = + new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, + ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg, new MessageQueueSelector() { + @Override + public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { + Integer id = (Integer) arg; + int index = id % mqs.size(); + return mqs.get(index); + } + }, orderId); + + System.out.printf("%s%n", sendResult); + } + + producer.shutdown(); +} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { + e.printStackTrace(); +} +``` + + +#### Subscription message sample code + +```java +DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); + +consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + +consumer.subscribe("TopicTest", "TagA || TagC || TagD"); + +consumer.registerMessageListener(new MessageListenerOrderly() { + + AtomicLong consumeTimes = new AtomicLong(0); + @Override + public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, + ConsumeOrderlyContext context) { + context.setAutoCommit(false); + System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); + this.consumeTimes.incrementAndGet(); + if ((this.consumeTimes.get() % 2) == 0) { + return ConsumeOrderlyStatus.SUCCESS; + } else if ((this.consumeTimes.get() % 3) == 0) { + return ConsumeOrderlyStatus.ROLLBACK; + } else if ((this.consumeTimes.get() % 4) == 0) { + return ConsumeOrderlyStatus.COMMIT; + } else if ((this.consumeTimes.get() % 5) == 0) { + context.setSuspendCurrentQueueTimeMillis(3000); + return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + return ConsumeOrderlyStatus.SUCCESS; + + } +}); + +consumer.start(); + +System.out.printf("Consumer Started.%n"); +``` + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/16-rmq-architecture.md ---------------------------------------------------------------------- diff --git a/_docs/16-rmq-architecture.md b/_docs/16-rmq-architecture.md new file mode 100644 index 0000000..fc2774e --- /dev/null +++ b/_docs/16-rmq-architecture.md @@ -0,0 +1,68 @@ +--- +title: "RocketMQ Architecture" +permalink: /docs/rmq-arc/ +excerpt: "The main Architecture of Apache RocketMQ." +modified: 2017-04-24T15:01:43-04:00 +--- + + +{% include toc %} + + + + +# Overview + +The greatest innovation of RocketMQ is high throughput, high reliability, and low latency of supporting massive messages through exquisite scale out and scale up. RocketMQ consists of four parts, Name Servers, Brokers, Producers and Consumers. Each of them can be horizontally extended without single Point of Failure. As shown in screenshot below. + + + +**NameServer Cluster** + +Name Servers provide lightweight service discovery and routing. Each Name Server records full routing information, provides equivalent reading and writing service, and supports fast storage expansion. + +**Broker Cluster** + +Brokers take care of message storage by providing lightweight TOPIC and QUEUE mechanisms. It supports the Push and Pull modes, fault tolerance mechanism (2 copies or 3 copies), strong padding of peaks and capacity of accumulating hundreds of billion messages in their original time order. In addition, Brokers provide disaster recovery, rich metrics statistics, and alert mechanisms, all of which are lack in traditional messaging systems. + +**Producer Cluster** + +Producers support distributed deployment. Distributed Producers send messages to the Broker cluster through multiple load balancing modes. The sending processes support fast failure and have low latency. + +**Consumer Cluster** + +Consumers support distributed deployment in the Push and Pull models as well. It also supports cluster consumption and message broadcast. It provides real-time message subscription mechanism and can meet most consumer scenarios. +RocketMQâs website provide a quick-start guide[3] for those guys who want to have a try without too much labor. + +# NameServer + +NameServer is a little but fully functional server, which mainly includes two features: + +* Broker Management, **NameServer** accepts the register from Broker cluster and provides heartbeat mechanism to ensure whether a broker is alive. +* Routing Management, each NameServer will hold whole routing info about the broker cluster and the **queue** info for clients query. + +As we know, RocketMQ clients(Producer/Consumer) will query the queue routing info from NameServer, but how do the clients find NameServer address? + +There are four methods to feed NameServer address list to clients: + +* Programmatic Way, like `producer.setNamesrvAddr("ip:port")`. +* Java Options, use `rocketmq.namesrv.addr`. +* Environment Variable, use `NAMESRV_ADDR`. +* HTTP Endpoint. + +More details about how to find NameServer address please refer to [here](/rocketmq/four-methods-to-feed-name-server-address-list/). + +# Broker Server + +Broker server is responsible for message store and delivery, message query, HA guarantee, and so on. + +As shown in image below, Broker server has searval important sub modules: + +* Remoting Module, the entry of broker, handle the requests from clients. +* Client Manager, manage the clients (Producer/Consumer), maintain topic subscription of consumer. +* Store Service, provide simple APIs to store or query message in physical disk. +* HA Service, provide data sync feature between master broker and slave broker. +* Index Service, build index for messages by specified key and provide quick message query function. + + + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/17-rmq-schedule-example.md ---------------------------------------------------------------------- diff --git a/_docs/17-rmq-schedule-example.md b/_docs/17-rmq-schedule-example.md new file mode 100644 index 0000000..a55b46d --- /dev/null +++ b/_docs/17-rmq-schedule-example.md @@ -0,0 +1,70 @@ +--- +title: "Schedule example" +permalink: /docs/schedule-example/ +excerpt: "How to use schedule component to reduce pull in RocketMQ." +modified: 2017-04-24T15:01:43-04:00 +--- + + + +{% include toc %} + + +### What is mq schedule? +If you use `DefaultMQPullConsumer` to consume message,you have to fetch message manualy.There are some steps here to achieve this point.But with `MQPullConsumerScheduleService`,you will consume messages easily. + +#### DefaultMQPullConsumer use case + +> First fetch subscribed queues of a topic + +```java +Set<MessageQueue> testTopic = consumer.fetchSubscribeMessageQueues("testTopic"); +``` + +> Second chose a queue to fetch message,and save queue offset manually. + +#### Use MQPullConsumerScheduleService consume message + +```java +final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); + +scheduleService.setMessageModel(MessageModel.CLUSTERING); +scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { + + @Override + public void doPullTask(MessageQueue mq, PullTaskContext context) { + MQPullConsumer consumer = context.getPullConsumer(); + try { + + long offset = consumer.fetchConsumeOffset(mq, false); + if (offset < 0) + offset = 0; + + PullResult pullResult = consumer.pull(mq, "*", offset, 32); + System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult); + switch (pullResult.getPullStatus()) { + case FOUND: + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + case OFFSET_ILLEGAL: + break; + default: + break; + } + consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset()); + + //consume message auto + context.setPullNextDelayTimeMillis(100); + } catch (Exception e) { + e.printStackTrace(); + } + } +}); + +scheduleService.start(); +``` + +#### Have fun with `MQPullConsumerScheduleService`. + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_docs/18-simple-example.md ---------------------------------------------------------------------- diff --git a/_docs/18-simple-example.md b/_docs/18-simple-example.md new file mode 100644 index 0000000..92ac35b --- /dev/null +++ b/_docs/18-simple-example.md @@ -0,0 +1,187 @@ +--- +title: "Simple Message Example" +permalink: /docs/simple-example/ +excerpt: "How to send simple message to reduce pull in RocketMQ." +modified: 2017-04-24T15:01:43-04:00 +--- + + + +{% include toc %} + +Use Rocketmq to send ordinary messages have three ways: reliable synchronous transmission, reliable +asynchronous transmission, one-way (Oneway) send. + +This paper introduces the principle of +each implementation, the use of the scene and the similarities and differences between the +three implementations, and provides a code example for reference +Reliable synchronous transmission +Principle: synchronous transmission refers to the sender of the message issued after the +data are received after the recipient sends back a packet should be made under the mode of +communication +Application scenarios: this way the application of the scene is very extensive, such as +important notification messages, SMS notification, SMS marketing system, etc.. + +```java +public class Producer { + public static void main(String[] args) throws MQClientException, + InterruptedException { + /* + * Instantiate with a producer group name. + */ + DefaultMQProducer producer = new + DefaultMQProducer("please_rename_unique_group_name"); + /* + * Specify name server addresses. + * <p/> + * + * Alternatively, you may specify name server addresses via exporting + environmental variable: NAMESRV_ADDR + * <pre> + * {@code + * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); + * } + * </pre> + */ + /* + * Launch the instance. + */ + producer.start(); + for (int i = 0; i < 1000; i++) { + try { + /* + * Create a message instance, specifying topic, tag and message + body. + */ + Message msg = new Message("TopicTest" /* Topic */, + "TagA" /* Tag */, + ("Hello RocketMQ " + + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ + ); + /* + * Call send message to deliver message to one of brokers. + */ + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000); + } + } + /* + * Shut down once the producer instance is not longer in use. + */ + producer.shutdown(); + } +} + +``` +#### Reliable asynchronous transmission +Principle: asynchronous transmission refers to the sender sends the data, not the receiver +back to respond, and then send the next packet communication. Asynchronous send MQ, +users need to realize asynchronous callback interface (SendCallback), in the implementation +of asynchronous send message to the application server, without waiting for a response can +be returned directly to the receiving server, through the callback interface response, and +response to the server for processing results. + + +Applications: asynchronous transmission is generally used to link time-consuming, RT +response time sensitive business scenarios, such as user video upload after notification to +start transcoding, the transcoding after push notification transcoding results. + +```java +public class AsyncProducer { + public static void main(String[] args) throws MQClientException, + InterruptedException, UnsupportedEncodingException { + DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); + producer.start(); + producer.setRetryTimesWhenSendAsyncFailed(0); + for (int i = 0; i < 10000000; i++) { + try { + final int index = i; + Message msg = new Message("Jodie_topic_1023", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + producer.send(msg, new SendCallback() { + @Override + public void onSuccess(SendResult sendResult) { + System.out.printf("%-10d OK %s %n", index, + sendResult.getMsgId()); + } + @Override + public void onException(Throwable e) { + System.out.printf("%-10d Exception %s %n", index, e); + e.printStackTrace(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + producer.shutdown(); + } +} +``` + +Unidirectional (Oneway) transmission +Principle: one-way (Oneway) to send the message is only responsible for sending the +message, do not wait for the server to respond and no callback function to trigger, that is, +send the request does not wait for a reply. The process of sending messages in this way +takes a very short time, usually at the microsecond level. +Application scenarios: for some very short, but not high reliability requirements of the scene, +such as log collection. + +```java +public class producerOneWay { + public static void main(String[] args) throws MQClientException, + InterruptedException, MQClientException { + /* + * Instantiate with a producer group name. + */ + DefaultMQProducer producer = new + DefaultMQProducer("please_rename_unique_group_name"); + /* + * Specify name server addresses. + * <p/> + * + * Alternatively, you may specify name server addresses via exporting + environmental variable: NAMESRV_ADDR + * <pre> + * {@code + * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); + * } + * </pre> + */ + /* + * Launch the instance. + */ + producer.start(); + for (int i = 0; i < 1000; i++) { + try { + /* + * Create a message instance, specifying topic, tag and message + body. + */ + Message msg = new Message("TopicTest" /* Topic */, + "TagA" /* Tag */, + ("Hello RocketMQ " + + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ + ); + /* + * Call send message to deliver message to one of brokers. + */ + producer.sendOneway(msg); + } catch (Exception e) { + e.printStackTrace(); + Thread.sleep(1000); + } + } + /* + * Shut down once the producer instance is not longer in use. + */ + producer.shutdown(); + } +} + +``` \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/_sass/_navigation.scss ---------------------------------------------------------------------- diff --git a/_sass/_navigation.scss b/_sass/_navigation.scss index 6dfde7a..a5da695 100644 --- a/_sass/_navigation.scss +++ b/_sass/_navigation.scss @@ -467,8 +467,8 @@ .nav__sub-title { display: block; - margin: 0.3rem 0; - padding: 0.3rem 0; + margin: 0.5rem 0; + padding: 0.5rem 0; font-family: $sans-serif-narrow; font-size: $type-size-6; font-weight: bold; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/assets/images/rmq-basic-arc.png ---------------------------------------------------------------------- diff --git a/assets/images/rmq-basic-arc.png b/assets/images/rmq-basic-arc.png new file mode 100644 index 0000000..33927be Binary files /dev/null and b/assets/images/rmq-basic-arc.png differ http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-site/blob/beb14e50/assets/images/rmq-basic-component.png ---------------------------------------------------------------------- diff --git a/assets/images/rmq-basic-component.png b/assets/images/rmq-basic-component.png new file mode 100644 index 0000000..4cdc306 Binary files /dev/null and b/assets/images/rmq-basic-component.png differ
