This is an automated email from the ASF dual-hosted git repository.

tigerlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 401dc8e  production level pull api demo
     new 1e8e728  Merge pull request #3295 from lwclover/develop
401dc8e is described below

commit 401dc8eaf11b24110786884f28a8406117ef2224
Author: sunhangda <[email protected]>
AuthorDate: Fri Aug 27 11:17:46 2021 +0800

    production level pull api demo
    
    production level pull api demo
---
 .../rocketmq/example/simple/PullConsumer.java      | 154 +++++++++++++++------
 1 file changed, 111 insertions(+), 43 deletions(-)

diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java 
b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index 8aec7e3..c6c706b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -16,63 +16,131 @@
  */
 package org.apache.rocketmq.example.simple;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 
+@SuppressWarnings("deprecation")
 public class PullConsumer {
-    private static final Map<MessageQueue, Long> OFFSE_TABLE = new 
HashMap<MessageQueue, Long>();
 
     public static void main(String[] args) throws MQClientException {
+       
         DefaultMQPullConsumer consumer = new 
DefaultMQPullConsumer("please_rename_unique_group_name_5");
         consumer.setNamesrvAddr("127.0.0.1:9876");
+        Set<String> topics = new HashSet<>();
+        //You would better to register topics,It will use in rebalance when 
starting
+        topics.add("TopicTest");
+        consumer.setRegisterTopics(topics);
         consumer.start();
 
-        Set<MessageQueue> mqs = 
consumer.fetchSubscribeMessageQueues("broker-a");
-        for (MessageQueue mq : mqs) {
-            System.out.printf("Consume from the queue: %s%n", mq);
-            SINGLE_MQ:
-            while (true) {
-                try {
-                    PullResult pullResult =
-                        consumer.pullBlockIfNotFound(mq, null, 
getMessageQueueOffset(mq), 32);
-                    System.out.printf("%s%n", pullResult);
-                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
-                    switch (pullResult.getPullStatus()) {
-                        case FOUND:
-                            break;
-                        case NO_MATCHED_MSG:
-                            break;
-                        case NO_NEW_MSG:
-                            break SINGLE_MQ;
-                        case OFFSET_ILLEGAL:
-                            break;
-                        default:
-                            break;
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
+        ExecutorService executors = 
Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
+                       @Override
+                       public Thread newThread(Runnable r) {
+                return new Thread(r, "PullConsumerThread");
             }
+               });
+        for(String topic : consumer.getRegisterTopics()){
+               
+               executors.execute(new Runnable() {
+                       
+                       public void doSomething(List<MessageExt> msgs){
+                               //do you business
+                               System.out.println(msgs);
+                       }
+                               @Override
+                               public void run() {
+                                       while(true){
+                                               try {
+                                                       Set<MessageQueue> 
messageQueues =  consumer.fetchMessageQueuesInBalance(topic);
+                                                       if(messageQueues == 
null || messageQueues.isEmpty()){
+                                                               
Thread.sleep(1000);
+                                                               continue;
+                                                       }
+                                                       PullResult pullResult = 
null;
+                                                       for(MessageQueue 
messageQueue : messageQueues){
+                                                               try {
+                                                                       long 
offset = this.consumeFromOffset(messageQueue);
+                                                                       
pullResult = consumer.pull(messageQueue, "*", offset, 32);
+                                                                       switch 
(pullResult.getPullStatus()) {
+                                               case FOUND:
+                                                       List<MessageExt> msgs = 
pullResult.getMsgFoundList();
+                                                       
+                                                       if(msgs != null && 
!msgs.isEmpty()){
+                                                               
this.doSomething(msgs);
+                                                               //update offset 
to broker
+                                                               
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+                                                               //print pull tps
+                                                                               
        this.incPullTPS(topic, pullResult.getMsgFoundList().size());
+                                                       }
+                                                       break;
+                                               case OFFSET_ILLEGAL:
+                                                       
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+                                                       break;
+                                               case NO_NEW_MSG:
+                                                       Thread.sleep(1);
+                                                       
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+                                                       break;
+                                               case NO_MATCHED_MSG:
+                                                       
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+                                                       break;
+                                               default:
+                                                               }
+                                                               } catch 
(RemotingException e) {
+                                                                       
e.printStackTrace();
+                                                               } catch 
(MQBrokerException e) {
+                                                                       
e.printStackTrace();
+                                                               } catch 
(Exception e){
+                                                                       
e.printStackTrace();
+                                                               }
+                                                       }
+                                               } catch (MQClientException e) {
+                                                       //reblance error
+                                                       e.printStackTrace();
+                                               } catch (InterruptedException 
e) {
+                                                       e.printStackTrace();
+                                               } catch (Exception e){
+                                                       e.printStackTrace();
+                                               }
+                                       }
+                               }
+                               
+                               public long consumeFromOffset(MessageQueue 
messageQueue) throws MQClientException{
+                                       //-1 when started
+                                       long offset = 
consumer.getOffsetStore().readOffset(messageQueue, 
ReadOffsetType.READ_FROM_MEMORY);
+                                       if(offset < 0){
+                                               //query from broker
+                                               offset = 
consumer.getOffsetStore().readOffset(messageQueue, 
ReadOffsetType.READ_FROM_STORE);
+                                       }
+                    if (offset < 0){
+                       //first time start from last offset
+                       offset = consumer.maxOffset(messageQueue);
+                    }
+                    //make sure
+                    if (offset < 0){
+                       offset = 0;
+                    }
+                                       return offset;
+                               }
+                               public void incPullTPS(String topic, int 
pullSize) {
+                                       
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+                                                       
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic, 
pullSize);
+                               }
+               });
+               
         }
-
-        consumer.shutdown();
+//        executors.shutdown();
+//        consumer.shutdown();
     }
-
-    private static long getMessageQueueOffset(MessageQueue mq) {
-        Long offset = OFFSE_TABLE.get(mq);
-        if (offset != null)
-            return offset;
-
-        return 0;
-    }
-
-    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
-        OFFSE_TABLE.put(mq, offset);
-    }
-
 }

Reply via email to