This is an automated email from the ASF dual-hosted git repository.

tigerlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9be760a  add pull consumer
     new 18c7979  Merge pull request #1085 from andrexuDeveloper/develop
9be760a is described below

commit 9be760a62c9ce532dcf99fda5d269dd6b8b61480
Author: xuhongcao <[email protected]>
AuthorDate: Thu Mar 14 11:01:31 2019 +0800

    add pull consumer
---
 .../java/API_Reference_ DefaultPullConsumer.md     | 143 +++++++++++++++++++++
 1 file changed, 143 insertions(+)

diff --git a/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md 
b/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md
new file mode 100644
index 0000000..64a2b7b
--- /dev/null
+++ b/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md 
@@ -0,0 +1,143 @@
+## DefaultPullConsumer
+---
+### 类简介
+
+1. `DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer`
+
+2. 
`DefaultMQPullConsumer`主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。
+                        
+3. 
优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。
+                        
+4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。
+                        
+``` java 
+
+ 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+ 
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+ 
+public class MQPullConsumer {
+ 
+       private static final Map<MessageQueue,Long> OFFSE_TABLE = new 
HashMap<MessageQueue,Long>();
+       
+       public static void main(String[] args) throws MQClientException {
+               DefaultMQPullConsumer consumer = new 
DefaultMQPullConsumer("groupName");
+               consumer.setNamesrvAddr("127.0.0.1:9876");
+               consumer.start();
+               // 从指定topic中拉取所有消息队列
+               Set<MessageQueue> mqs = 
consumer.fetchSubscribeMessageQueues("order-topic");
+               for(MessageQueue mq:mqs){
+                       try {
+                               // 获取消息的offset,指定从store中获取
+                               long offset = 
consumer.fetchConsumeOffset(mq,true);
+                               System.out.println("consumer from the 
queue:"+mq+":"+offset);
+                               while(true){
+                                       PullResult pullResult = 
consumer.pullBlockIfNotFound(mq, null, 
+                                                       
getMessageQueueOffset(mq), 32);
+                                       
putMessageQueueOffset(mq,pullResult.getNextBeginOffset());
+                                       switch(pullResult.getPullStatus()){
+                                       case FOUND:
+                                               List<MessageExt> messageExtList 
= pullResult.getMsgFoundList();
+                        for (MessageExt m : messageExtList) {
+                            System.out.println(new String(m.getBody()));
+                        }
+                                               break;
+                                       case NO_MATCHED_MSG:
+                                               break;
+                                       case NO_NEW_MSG:
+                                               break;
+                                       case OFFSET_ILLEGAL:
+                                               break;
+                                       }
+                               }
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               }
+               consumer.shutdown();
+       }
+ 
+       // 保存上次消费的消息下标
+       private static void putMessageQueueOffset(MessageQueue mq,
+                       long nextBeginOffset) {
+               OFFSE_TABLE.put(mq, nextBeginOffset);
+       }
+       
+       // 获取上次消费的消息的下标
+       private static Long getMessageQueueOffset(MessageQueue mq) {
+               Long offset = OFFSE_TABLE.get(mq);
+               if(offset != null){
+                       return offset;
+               }
+               return 0l;
+       }
+       
+ 
+}
+```
+
+
+
+### 字段摘要
+|类型|字段名称|描述|
+|------|-------|-------|
+|DefaultMQPullConsumerImpl|defaultMQPullConsumerImpl|DefaultMQPullConsumer的内部核心处理默认实现|
+|String|consumerGroup|消费的唯一分组|
+|long|brokerSuspendMaxTimeMillis|consumer取连接broker的最大延迟时间,不建议修改|
+|long|consumerTimeoutMillisWhenSuspend|pull取连接的最大超时时间,必须大于brokerSuspendMaxTimeMillis,不建议修改|
+|long|consumerPullTimeoutMillis|socket连接的最大超时时间,不建议修改|
+|String|messageModel|默认cluster模式|
+|int|messageQueueListener|消息queue监听器,用来获取topic的queue变化|
+|int|offsetStore|RemoteBrokerOffsetStore 远程与本地offset存储器|
+|int|registerTopics|注册到该consumer的topic集合|
+|int|allocateMessageQueueStrategy|consumer的默认获取queue的负载分配策略算法|
+
+### 构造方法摘要
+
+|方法名称|方法描述|
+|-------|------------|
+|DefaultMQPullConsumer()|由默认参数值创建一个Pull消费者 |
+|DefaultMQPullConsumer(final String consumerGroup, RPCHook 
rpcHook)|使用指定的分组名,hook创建一个消费者|
+|DefaultMQPullConsumer(final String consumerGroup)|使用指定的分组名消费者|
+|DefaultMQPullConsumer(RPCHook rpcHook)|使用指定的hook创建一个生产者|
+
+
+### 使用方法摘要
+
+|返回值|方法名称|方法描述|
+|-------|-------|------------|
+|MQAdmin接口method|-------|------------|
+|void|createTopic(String key, String newTopic, int 
queueNum)|在broker上创建指定的topic|
+|void|createTopic(String key, String newTopic, int queueNum, int 
topicSysFlag)|在broker上创建指定的topic|
+|long|earliestMsgStoreTime(MessageQueue mq)|查询最早的消息存储时间|
+|long|maxOffset(MessageQueue mq)|查询给定消息队列的最大offset|
+|long|minOffset(MessageQueue mq)|查询给定消息队列的最小offset|
+|QueryResult|queryMessage(String topic, String key, int maxNum, long begin, 
long end)|按关键字查询消息|
+|long|searchOffset(MessageQueue mq, long timestamp)|查找指定时间的消息队列的物理offset|
+|MessageExt|viewMessage(String offsetMsgId)|根据给定的msgId查询消息|
+|MessageExt|public MessageExt viewMessage(String topic, String 
msgId)|根据给定的msgId查询消息,并指定topic|
+|MQConsumer接口method|-------|------------|
+|Set<MessageQueue>|fetchSubscribeMessageQueues(String topic)|根据topic获取订阅的Queue|
+|void|sendMessageBack(final MessageExt msg, final int 
delayLevel)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL|
+|void|sendMessageBack(final MessageExt msg, final int delayLevel, final String 
brokerName)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL|
+|MQPullConsumer接口method|-------|------------|
+|long|fetchConsumeOffset(MessageQueue mq, boolean fromStore)|查询给定消息队列的最大offset|
+|PullResult |pull(final MessageQueue mq, final String subExpression, final 
long offset,final int maxNums)|异步拉取制定匹配的消息|
+|PullResult| pull(final MessageQueue mq, final String subExpression, final 
long offset,final int maxNums, final long timeout)|异步拉取制定匹配的消息|
+|PullResult|pull(final MessageQueue mq, final MessageSelector selector, final 
long offset,final int 
maxNums)|异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType|
+|PullResult|pullBlockIfNotFound(final MessageQueue mq, final String 
subExpression,final long offset, final int 
maxNums)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis|
+|void|pullBlockIfNotFound(final MessageQueue mq, final String subExpression, 
final long offset,final int maxNums, final PullCallback 
pullCallback)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费|
    
+|void|updateConsumeOffset(final MessageQueue mq, final long 
offset)|更新指定mq的offset|
+|long|fetchMessageQueuesInBalance(String 
topic)|根据topic获取订阅的Queue(是balance分配后的)|
+|void|void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, 
String 
consumerGroup)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费|
+|void|shutdown()|关闭当前消费者实例并释放相关资源|
+|void|start()|启动消费者|
+

Reply via email to