wlliqipeng commented on a change in pull request #655: [RIP-9] Add the simple 
example description
URL: https://github.com/apache/rocketmq/pull/655#discussion_r248918290
 
 

 ##########
 File path: docs/cn/RocketMQ_Example.md
 ##########
 @@ -0,0 +1,957 @@
+样例(sample)
+============
+
+基本样例
+--------
+
+在基本样例中我们提供如下的功能场景:
+
+* 使用RocketMQ发送三种类型的消息:同步消息,异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
+* 使用RocketMQ来消费接收到的消息。
+
+### 1、加入依赖:
+
+`maven:`
+```
+<dependency> 
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-client</artifactId>
+    <version>4.3.0</version>  
+</dependency>
+```
+`gradle`
+```
+compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
+```
+### 2、消息发送
+
+#### 1. Producer端发送同步消息
+
+这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
+```java
+public class SyncProducer {
+       public static void main(String[] args) throws Exception {
+       // 实例化消息生产者Producer
+        DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
+       // 设置NameServer的地址
+       producer.setNamesrvAddr("localhost:9876");
+       // 启动Producer实例
+        producer.start();
+       for (int i = 0; i < 100; i++) {
+               // 创建消息,并指定Topic,Tag和消息体
+               Message msg = new Message("TopicTest" /* Topic */,
+               "TagA" /* Tag */,
+               ("Hello RocketMQ " + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+               );
+               // 发送消息到一个Broker
+            SendResult sendResult = producer.send(msg);
+            // 通过sendResult返回消息是否成功送达
+               System.out.printf("%s%n", sendResult);
+       }
+       // 如果不再发送消息,关闭Producer实例。
+        producer.shutdown();
+       }
+}
+```
+#### 2. 发送异步消息
+
+异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
+
+```java
+public class AsyncProducer {
+       public static void main(String[] args) throws Exception {
+       // 实例化消息生产者Producer
+        DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
+       // 设置NameServer的地址
+        producer.setNamesrvAddr("localhost:9876");
+       // 启动Producer实例
+        producer.start();
+        producer.setRetryTimesWhenSendAsyncFailed(0);
+       for (int i = 0; i < 100; i++) {
+                final int index = i;
+               // 创建消息,并指定Topic,Tag和消息体
+                Message msg = new Message("TopicTest",
+                    "TagA",
+                    "OrderID188",
+                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+                // SendCallback接收异步返回结果的回调
+                producer.send(msg, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        System.out.printf("%-10d OK %s %n", index,
+                            sendResult.getMsgId());
+                    }
+                    @Override
+                    public void onException(Throwable e) {
+                     System.out.printf("%-10d Exception %s %n", index, e);
+                     e.printStackTrace();
+                    }
+               });
+       }
+       // 如果不再发送消息,关闭Producer实例。
+        producer.shutdown();
+       }
+}
+```
+
+#### 3. 单向发送消息
+
+这种方式主要用在不特别关心发送结果的场景,例如日志发送。
+
+```java
+
+public class OnewayProducer {
+       public static void main(String[] args) throws Exception{
+       // 实例化消息生产者Producer
+        DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
+       // 设置NameServer的地址
+        producer.setNamesrvAddr("localhost:9876");
+       // 启动Producer实例
+        producer.start();
+       for (int i = 0; i < 100; i++) {
+               // 创建消息,并指定Topic,Tag和消息体
+               Message msg = new Message("TopicTest" /* Topic */,
+                "TagA" /* Tag */,
+                ("Hello RocketMQ " + 
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+               );
+               // 发送单向消息,没有任何返回结果
+               producer.sendOneway(msg);
+ 
+       }
+       // 如果不再发送消息,关闭Producer实例。
+        producer.shutdown();
+       }
+```
+
+### 3、消费消息
+
+```java
+
+public class Consumer {
+ 
+       public static void main(String[] args) throws InterruptedException, 
MQClientException {
+ 
+       // 实例化消费者
+        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("please_rename_unique_group_name");
+       
+       // 设置NameServer的地址
+        consumer.setNamesrvAddr("localhost:9876");
+       
+       // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
+        consumer.subscribe("TopicTest", "*");
+       // 注册回调实现类来处理从broker拉取回来的消息
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+ 
+            @Override
+               public ConsumeConcurrentlyStatus 
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf("%s Receive New Messages: %s %n", 
Thread.currentThread().getName(), msgs);
+               // 标记该消息已经被成功消费
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+               }
+       });
+ 
+       // 启动消费者实例
+        consumer.start();
+ 
+        System.out.printf("Consumer Started.%n");
+       }
+}
+
+```
+
+
+顺序消息样例
+----------
+
+消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
+
+顺序消费的原理解析,在默认的情况下消息发送会采取Round 
Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
+
+下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
+
+### 1、顺序消息生产
+
+```java
+
+package org.apache.rocketmq.example.order2;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+* Producer,发送顺序消息
+*/
+public class Producer {
+
+   public static void main(String[] args) throws Exception {
+       DefaultMQProducer producer = new 
DefaultMQProducer("please_rename_unique_group_name");
+
+       producer.setNamesrvAddr("127.0.0.1:9876");
+
+       producer.start();
+
+       String[] tags = new String[]{"TagA", "TagC", "TagD"};
+
+       // 订单列表
+       List<OrderStep> orderList = new Producer().buildOrders();
+
+       Date date = new Date();
+       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+       String dateStr = sdf.format(date);
+       for (int i = 0; i < 10; i++) {
+           // 加个时间前缀
+           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
+           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" 
+ i, body.getBytes());
+
+           SendResult sendResult = producer.send(msg, new 
MessageQueueSelector() {
+               @Override
+               public MessageQueue select(List<MessageQueue> mqs, Message msg, 
Object arg) {
+                   Long id = (Long) arg;  //根据订单id选择发送queue
+                   long index = id % mqs.size();
+                   return mqs.get((int) index);
+               }
+           }, orderList.get(i).getOrderId());//订单id
+
+           System.out.println(String.format("SendResult status:%s, queueId:%d, 
body:%s",
+               sendResult.getSendStatus(),
+               sendResult.getMessageQueue().getQueueId(),
+               body));
+       }
+
+       producer.shutdown();
+   }
+
+   /**
+    * 订单的步骤
+    */
+   private static class OrderStep {
+       private long orderId;
+       private String desc;
+
+       public long getOrderId() {
+           return orderId;
+       }
+
+       public void setOrderId(long orderId) {
+           this.orderId = orderId;
+       }
+
+       public String getDesc() {
+           return desc;
+       }
+
+       public void setDesc(String desc) {
+           this.desc = desc;
+       }
+
+       @Override
+       public String toString() {
+           return "OrderStep{" +
+               "orderId=" + orderId +
+               ", desc='" + desc + '\'' +
+               '}';
+       }
+   }
+
+   /**
+    * 生成模拟订单数据
+    */
+   private List<OrderStep> buildOrders() {
+       List<OrderStep> orderList = new ArrayList<OrderStep>();
+
+       OrderStep orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111039L);
+       orderDemo.setDesc("创建");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111065L);
+       orderDemo.setDesc("创建");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111039L);
+       orderDemo.setDesc("付款");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103117235L);
+       orderDemo.setDesc("创建");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111065L);
+       orderDemo.setDesc("付款");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103117235L);
+       orderDemo.setDesc("付款");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111065L);
+       orderDemo.setDesc("完成");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111039L);
+       orderDemo.setDesc("推送");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103117235L);
+       orderDemo.setDesc("完成");
+       orderList.add(orderDemo);
+
+       orderDemo = new OrderStep();
+       orderDemo.setOrderId(15103111039L);
+       orderDemo.setDesc("完成");
+       orderList.add(orderDemo);
+
+       return orderList;
+   }
+}
+
+```
+
+### 2、顺序消费消息
+
+```java
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+package org.apache.rocketmq.example.order2;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
+*/
+public class ConsumerInOrder {
+
+   public static void main(String[] args) throws Exception {
+       DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("please_rename_unique_group_name_3");
+       consumer.setNamesrvAddr("127.0.0.1:9876");
+       /**
+        * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
+        * 如果非第一次启动,那么按照上次消费的位置继续消费
+        */
+       
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+       consumer.subscribe("TopicTest", "TagA || TagC || TagD");
+
+       consumer.registerMessageListener(new MessageListenerOrderly() {
+
+           Random random = new Random();
+
+           @Override
+           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
ConsumeOrderlyContext context) {
+               context.setAutoCommit(true);
+               for (MessageExt msg : msgs) {
+                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
+                   System.out.println("consumeThread=" + 
Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" 
+ new String(msg.getBody()));
+               }
+
+               try {
+                   //模拟业务逻辑处理中...
+                   TimeUnit.SECONDS.sleep(random.nextInt(10));
+               } catch (Exception e) {
+                   e.printStackTrace();
+               }
+               return ConsumeOrderlyStatus.SUCCESS;
+           }
+       });
+
+       consumer.start();
+
+       System.out.println("Consumer Started.");
+   }
+}
+
+```
+
+延时消息样例
+----------
+
 
 Review comment:
   能否简单介绍下目前支持的延时消息基本情况、限制以及应用场景

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to