Jaskey closed pull request #66: [ROCKETMQ-106] Add flow control on topic level
URL: https://github.com/apache/rocketmq/pull/66
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9c9b59ef..cf55a951 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -166,11 +166,18 @@
     private int consumeConcurrentlyMaxSpan = 2000;
 
     /**
-     * Flow control threshold
+     * Flow control threshold on queue level
      */
     private int pullThresholdForQueue = 1000;
 
     /**
+     * Introduced since 4.1.x
+     * Flow control threshold on topic level.The value should be greater or 
equals than pullThresholdForQueue otherwise flow control for topic will always 
come first before flow control on queue.
+     * By default, it is set to max value of Integer.MAX_VALUE, which means 
flow control for topic is disable.
+     */
+    private int pullThresholdForTopic = Integer.MAX_VALUE;
+
+    /**
      * Message pull Interval
      */
     private long pullInterval = 0;
@@ -401,6 +408,14 @@ public void setPullThresholdForQueue(int 
pullThresholdForQueue) {
         this.pullThresholdForQueue = pullThresholdForQueue;
     }
 
+    public int getPullThresholdForTopic() {
+        return pullThresholdForTopic;
+    }
+
+    public void setPullThresholdForTopic(int pullThresholdForTopic) {
+        this.pullThresholdForTopic = pullThresholdForTopic;
+    }
+
     public Map<String, String> getSubscription() {
         return subscription;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 9bf34be8..dbcca59d 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -106,8 +106,9 @@
     private MessageListener messageListenerInner;
     private OffsetStore offsetStore;
     private ConsumeMessageService consumeMessageService;
-    private long flowControlTimes1 = 0;
-    private long flowControlTimes2 = 0;
+    private long topicFlowControlTimes = 0;
+    private long queueFlowControlTimes = 0;
+    private long queueMaxSpanFlowControlTimes = 0;
 
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer 
defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
@@ -219,13 +220,37 @@ public void pullMessage(final PullRequest pullRequest) {
             return;
         }
 
+        //flow control for topic
+        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != 
Integer.MAX_VALUE) {
+            Map<MessageQueue, ProcessQueue> allProcessQMap = 
this.getRebalanceImpl().getProcessQueueTable();
+            Iterator<Entry<MessageQueue, ProcessQueue>> it = 
allProcessQMap.entrySet().iterator();
+            long sizeOfAllQueue = 0;
+            //pick the relative process queues and calculate size
+            while (it.hasNext()) {
+                Entry<MessageQueue, ProcessQueue> entry = it.next();
+                if 
(pullRequest.getMessageQueue().getTopic().equals(entry.getKey().getTopic())) {
+                    sizeOfAllQueue += entry.getValue().getMsgCount().get();
+                }
+            }
+            if (sizeOfAllQueue > 
this.defaultMQPushConsumer.getPullThresholdForTopic()) {
+                this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+                if ((topicFlowControlTimes++ % 1000) == 0) {
+                    log.warn(
+                        "the consumer message topic buffer is full, so do flow 
control, minOffset={}, maxOffset={}, sizeOfAllQueue={}, pullRequest={}, 
flowControlTimes={}",
+                        processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), sizeOfAllQueue, pullRequest, 
topicFlowControlTimes);
+                }
+                return;
+            }
+        }
+
+        //flow control for queue
         long size = processQueue.getMsgCount().get();
         if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
             this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-            if ((flowControlTimes1++ % 1000) == 0) {
+            if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
-                    "the consumer message buffer is full, so do flow control, 
minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
-                    processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+                    "the consumer message queue buffer is full, so do flow 
control, minOffset={}, maxOffset={}, size={}, pullRequest={}, 
flowControlTimes={}",
+                    processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), size, pullRequest, 
queueFlowControlTimes);
             }
             return;
         }
@@ -233,11 +258,11 @@ public void pullMessage(final PullRequest pullRequest) {
         if (!this.consumeOrderly) {
             if (processQueue.getMaxSpan() > 
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                 this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-                if ((flowControlTimes2++ % 1000) == 0) {
+                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                     log.warn(
                         "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, 
flowControlTimes={}",
                         processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
-                        pullRequest, flowControlTimes2);
+                        pullRequest, queueMaxSpanFlowControlTimes);
                 }
                 return;
             }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index cdf1d780..1ad71fc8 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -19,10 +19,15 @@
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -31,6 +36,7 @@
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
@@ -52,6 +58,7 @@
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -109,7 +116,6 @@ public void init() throws Exception {
         field.setAccessible(true);
         field.set(pushConsumerImpl, mQClientFactory);
 
-
         field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
         field.setAccessible(true);
         field.set(mQClientFactory, mQClientAPIImpl);
@@ -125,27 +131,45 @@ public void init() throws Exception {
         when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
             anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
             .thenAnswer(new Answer<Object>() {
-            @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
-                PullMessageRequestHeader requestHeader = mock.getArgument(1);
-                MessageClientExt messageClientExt = new MessageClientExt();
-                messageClientExt.setTopic(topic);
-                messageClientExt.setQueueId(0);
-                messageClientExt.setMsgId("123");
-                messageClientExt.setBody(new byte[] {'a'});
-                messageClientExt.setOffsetMsgId("234");
-                messageClientExt.setBornHost(new InetSocketAddress(8080));
-                messageClientExt.setStoreHost(new InetSocketAddress(8080));
-                PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
-                ((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
-                return pullResult;
-            }
-        });
+                @Override public Object answer(InvocationOnMock mock) throws 
Throwable {
+                    PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
+                    MessageClientExt messageClientExt = new MessageClientExt();
+                    messageClientExt.setTopic(topic);
+                    messageClientExt.setQueueId(((PullMessageRequestHeader) 
mock.getArgument(1)).getQueueId());
+                    messageClientExt.setMsgId("123");
+                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setOffsetMsgId("234");
+                    messageClientExt.setBornHost(new InetSocketAddress(8080));
+                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                    
messageClientExt.setQueueOffset(((PullMessageRequestHeader) 
mock.getArgument(1)).getQueueOffset());
+                    PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                    ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+                    return pullResult;
+                }
+            });
 
         doReturn(new FindBrokerResult("127.0.0.1:10911", 
false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean());
-        
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(),
 anyString());
-        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
-        messageQueueSet.add(createPullRequest().getMessageQueue());
+        doReturn(new 
ArrayList<String>(Collections.singletonList(mQClientFactory.getClientId()))).when(mQClientFactory).findConsumerIdList(anyString(),
 anyString());
+        //START: mock allocating 4 queue
+        final Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createPullRequest(0).getMessageQueue());
+        messageQueueSet.add(createPullRequest(1).getMessageQueue());
+        messageQueueSet.add(createPullRequest(2).getMessageQueue());
+        messageQueueSet.add(createPullRequest(3).getMessageQueue());
+        pushConsumer.setAllocateMessageQueueStrategy(new 
AllocateMessageQueueStrategy() {
+            @Override public List<MessageQueue> allocate(String consumerGroup, 
String currentCID,
+                List<MessageQueue> mqAll, List<String> cidAll) {
+                return new ArrayList<MessageQueue>(messageQueueSet);
+            }
+
+            @Override public String getName() {
+                return "TEST";
+            }
+        });
+        rebalancePushImpl.getTopicSubscribeInfoTable().putIfAbsent(topic, 
messageQueueSet);
         
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, 
messageQueueSet);
+        //END: mock allocating 4 queue
+
         
doReturn(123L).when(rebalancePushImpl).computePullFromWhere(any(MessageQueue.class));
     }
 
@@ -166,15 +190,55 @@ public void testPullMessage_Success() throws 
InterruptedException, RemotingExcep
                 return null;
             }
         }));
-
-        PullMessageService pullMessageService = 
mQClientFactory.getPullMessageService();
-        pullMessageService.executePullRequestImmediately(createPullRequest());
+        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
         countDownLatch.await();
         assertThat(messageExts[0].getTopic()).isEqualTo(topic);
         assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
     }
 
     @Test
+    public void testTopicFlowControl() throws Exception {
+        final int flowControl = 200;
+        final int pullBatchSize = 1;
+        pushConsumer.setPullInterval(0);
+        pushConsumer.setPullBatchSize(pullBatchSize);
+        pushConsumer.setPullThresholdForQueue(flowControl / 2);//4 queue, each 
flow control in 100
+        pushConsumer.setPullThresholdForTopic(flowControl);//topic flow 
control in 200
+        pushConsumer.setConsumeThreadMin(200);
+        pushConsumer.setConsumeThreadMax(500);
+        
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new 
ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), 
new MessageListenerConcurrently() {
+            @Override public ConsumeConcurrentlyStatus 
consumeMessage(List<MessageExt> msgs,
+                ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    try {
+                        Thread.sleep(2000);//block some time to make it 
accumulated
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        }));
+        pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
+        Thread.sleep(3000);//spend some time to consume
+
+        //START  : check flow control on topic level
+        Map<MessageQueue, ProcessQueue> allProcessQMap = 
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().getProcessQueueTable();
+        Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = 
allProcessQMap.entrySet().iterator();
+        long sizeOfAllQueue = 0;
+        //pick the relative process queues and calculate size
+        while (it.hasNext()) {
+            Map.Entry<MessageQueue, ProcessQueue> entry = it.next();
+            if (topic.equals(entry.getKey().getTopic())) {
+                sizeOfAllQueue += entry.getValue().getMsgCount().get();
+            }
+        }
+        Assert.assertTrue("topic flow control does not work as expected, 
actual = " + sizeOfAllQueue + " flowControl = " + flowControl, sizeOfAllQueue 
<= flowControl + pullBatchSize);
+        //END  : check flow control on topic level
+
+    }
+
+    @Test
     public void testPullMessage_SuccessWithOrderlyService() throws Exception {
         final CountDownLatch countDownLatch = new CountDownLatch(1);
         final MessageExt[] messageExts = new MessageExt[1];
@@ -200,13 +264,17 @@ public ConsumeOrderlyStatus 
consumeMessage(List<MessageExt> msgs, ConsumeOrderly
     }
 
     private PullRequest createPullRequest() {
+        return createPullRequest(0);
+    }
+
+    private PullRequest createPullRequest(int queueId) {
         PullRequest pullRequest = new PullRequest();
         pullRequest.setConsumerGroup(consumerGroup);
         pullRequest.setNextOffset(1024);
 
         MessageQueue messageQueue = new MessageQueue();
         messageQueue.setBrokerName(brokerName);
-        messageQueue.setQueueId(0);
+        messageQueue.setQueueId(queueId);
         messageQueue.setTopic(topic);
         pullRequest.setMessageQueue(messageQueue);
         ProcessQueue processQueue = new ProcessQueue();
@@ -217,7 +285,8 @@ private PullRequest createPullRequest() {
         return pullRequest;
     }
 
-    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus, List<MessageExt> messageExtList) throws 
Exception {
+    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus,
+        List<MessageExt> messageExtList) throws Exception {
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         for (MessageExt messageExt : messageExtList) {
             outputStream.write(MessageDecoder.encode(messageExt, false));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to