This is an automated email from the ASF dual-hosted git repository.
tigerlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 401dc8e production level pull api demo
new 1e8e728 Merge pull request #3295 from lwclover/develop
401dc8e is described below
commit 401dc8eaf11b24110786884f28a8406117ef2224
Author: sunhangda <[email protected]>
AuthorDate: Fri Aug 27 11:17:46 2021 +0800
production level pull api demo
production level pull api demo
---
.../rocketmq/example/simple/PullConsumer.java | 154 +++++++++++++++------
1 file changed, 111 insertions(+), 43 deletions(-)
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
index 8aec7e3..c6c706b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullConsumer.java
@@ -16,63 +16,131 @@
*/
package org.apache.rocketmq.example.simple;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+@SuppressWarnings("deprecation")
public class PullConsumer {
- private static final Map<MessageQueue, Long> OFFSE_TABLE = new
HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
+
DefaultMQPullConsumer consumer = new
DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
+ Set<String> topics = new HashSet<>();
+ //You would better to register topics,It will use in rebalance when
starting
+ topics.add("TopicTest");
+ consumer.setRegisterTopics(topics);
consumer.start();
- Set<MessageQueue> mqs =
consumer.fetchSubscribeMessageQueues("broker-a");
- for (MessageQueue mq : mqs) {
- System.out.printf("Consume from the queue: %s%n", mq);
- SINGLE_MQ:
- while (true) {
- try {
- PullResult pullResult =
- consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 32);
- System.out.printf("%s%n", pullResult);
- putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
- switch (pullResult.getPullStatus()) {
- case FOUND:
- break;
- case NO_MATCHED_MSG:
- break;
- case NO_NEW_MSG:
- break SINGLE_MQ;
- case OFFSET_ILLEGAL:
- break;
- default:
- break;
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
+ ExecutorService executors =
Executors.newFixedThreadPool(topics.size(), new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "PullConsumerThread");
}
+ });
+ for(String topic : consumer.getRegisterTopics()){
+
+ executors.execute(new Runnable() {
+
+ public void doSomething(List<MessageExt> msgs){
+ //do you business
+ System.out.println(msgs);
+ }
+ @Override
+ public void run() {
+ while(true){
+ try {
+ Set<MessageQueue>
messageQueues = consumer.fetchMessageQueuesInBalance(topic);
+ if(messageQueues ==
null || messageQueues.isEmpty()){
+
Thread.sleep(1000);
+ continue;
+ }
+ PullResult pullResult =
null;
+ for(MessageQueue
messageQueue : messageQueues){
+ try {
+ long
offset = this.consumeFromOffset(messageQueue);
+
pullResult = consumer.pull(messageQueue, "*", offset, 32);
+ switch
(pullResult.getPullStatus()) {
+ case FOUND:
+ List<MessageExt> msgs =
pullResult.getMsgFoundList();
+
+ if(msgs != null &&
!msgs.isEmpty()){
+
this.doSomething(msgs);
+ //update offset
to broker
+
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+ //print pull tps
+
this.incPullTPS(topic, pullResult.getMsgFoundList().size());
+ }
+ break;
+ case OFFSET_ILLEGAL:
+
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+ break;
+ case NO_NEW_MSG:
+ Thread.sleep(1);
+
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+ break;
+ case NO_MATCHED_MSG:
+
consumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
+ break;
+ default:
+ }
+ } catch
(RemotingException e) {
+
e.printStackTrace();
+ } catch
(MQBrokerException e) {
+
e.printStackTrace();
+ } catch
(Exception e){
+
e.printStackTrace();
+ }
+ }
+ } catch (MQClientException e) {
+ //reblance error
+ e.printStackTrace();
+ } catch (InterruptedException
e) {
+ e.printStackTrace();
+ } catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public long consumeFromOffset(MessageQueue
messageQueue) throws MQClientException{
+ //-1 when started
+ long offset =
consumer.getOffsetStore().readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY);
+ if(offset < 0){
+ //query from broker
+ offset =
consumer.getOffsetStore().readOffset(messageQueue,
ReadOffsetType.READ_FROM_STORE);
+ }
+ if (offset < 0){
+ //first time start from last offset
+ offset = consumer.maxOffset(messageQueue);
+ }
+ //make sure
+ if (offset < 0){
+ offset = 0;
+ }
+ return offset;
+ }
+ public void incPullTPS(String topic, int
pullSize) {
+
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory()
+
.getConsumerStatsManager().incPullTPS(consumer.getConsumerGroup(), topic,
pullSize);
+ }
+ });
+
}
-
- consumer.shutdown();
+// executors.shutdown();
+// consumer.shutdown();
}
-
- private static long getMessageQueueOffset(MessageQueue mq) {
- Long offset = OFFSE_TABLE.get(mq);
- if (offset != null)
- return offset;
-
- return 0;
- }
-
- private static void putMessageQueueOffset(MessageQueue mq, long offset) {
- OFFSE_TABLE.put(mq, offset);
- }
-
}