This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0b80f80  [ROCKETMQ-294] Do flow control on the number and size 
dimensions when pull message
0b80f80 is described below

commit 0b80f8092ed0f3fdae7ae8c3212a4286b7f24cfc
Author: yukon <[email protected]>
AuthorDate: Wed Oct 18 10:42:04 2017 +0800

    [ROCKETMQ-294] Do flow control on the number and size dimensions when pull 
message
    
    Author: yukon <[email protected]>
    
    Closes #171 from zhouxinyu/ROCKETMQ-294.
---
 .../client/consumer/DefaultMQPushConsumer.java     |  58 +++++++-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |  58 ++++++--
 .../client/impl/consumer/ProcessQueue.java         |  14 +-
 .../client/impl/consumer/RebalancePushImpl.java    |  20 +++
 .../client/consumer/DefaultMQPushConsumerTest.java |  55 +++++++
 .../client/impl/consumer/ProcessQueueTest.java     | 107 ++++++++++++++
 .../impl/consumer/RebalancePushImplTest.java       | 163 +++++++++++++++++++++
 .../common/protocol/body/ProcessQueueInfo.java     |  15 +-
 8 files changed, 476 insertions(+), 14 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index e32edc9..d51030a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -166,11 +166,43 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
     private int consumeConcurrentlyMaxSpan = 2000;
 
     /**
-     * Flow control threshold
+     * Flow control threshold on queue level, each message queue will cache at 
most 1000 messages by default,
+     * Consider the {@code pullBatchSize}, the instantaneous value may exceed 
the limit
      */
     private int pullThresholdForQueue = 1000;
 
     /**
+     * Limit the cached message size on queue level, each message queue will 
cache at most 100 MiB messages by default,
+     * Consider the {@code pullBatchSize}, the instantaneous value may exceed 
the limit
+     *
+     * <p>
+     * The size of a message only measured by message body, so it's not 
accurate
+     */
+    private int pullThresholdSizeForQueue = 100;
+
+    /**
+     * Flow control threshold on topic level, default value is -1(Unlimited)
+     * <p>
+     * The value of {@code pullThresholdForQueue} will be overwrote and 
calculated based on
+     * {@code pullThresholdForTopic} if it is't unlimited
+     * <p>
+     * For example, if the value of pullThresholdForTopic is 1000 and 10 
message queues are assigned to this consumer,
+     * then pullThresholdForQueue will be set to 100
+     */
+    private int pullThresholdForTopic = -1;
+
+    /**
+     * Limit the cached message size on topic level, default value is -1 
MiB(Unlimited)
+     * <p>
+     * The value of {@code pullThresholdSizeForQueue} will be overwrote and 
calculated based on
+     * {@code pullThresholdSizeForTopic} if it is't unlimited
+     * <p>
+     * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 
10 message queues are
+     * assigned to this consumer, then pullThresholdSizeForQueue will be set 
to 100 MiB
+     */
+    private int pullThresholdSizeForTopic = -1;
+
+    /**
      * Message pull Interval
      */
     private long pullInterval = 0;
@@ -407,6 +439,30 @@ public class DefaultMQPushConsumer extends ClientConfig 
implements MQPushConsume
         this.pullThresholdForQueue = pullThresholdForQueue;
     }
 
+    public int getPullThresholdForTopic() {
+        return pullThresholdForTopic;
+    }
+
+    public void setPullThresholdForTopic(final int pullThresholdForTopic) {
+        this.pullThresholdForTopic = pullThresholdForTopic;
+    }
+
+    public int getPullThresholdSizeForQueue() {
+        return pullThresholdSizeForQueue;
+    }
+
+    public void setPullThresholdSizeForQueue(final int 
pullThresholdSizeForQueue) {
+        this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+    }
+
+    public int getPullThresholdSizeForTopic() {
+        return pullThresholdSizeForTopic;
+    }
+
+    public void setPullThresholdSizeForTopic(final int 
pullThresholdSizeForTopic) {
+        this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
+    }
+
     public Map<String, String> getSubscription() {
         return subscription;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4ba6216..72bc953 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
     private MessageListener messageListenerInner;
     private OffsetStore offsetStore;
     private ConsumeMessageService consumeMessageService;
-    private long flowControlTimes1 = 0;
-    private long flowControlTimes2 = 0;
+    private long queueFlowControlTimes = 0;
+    private long queueMaxSpanFlowControlTimes = 0;
 
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer 
defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
@@ -219,13 +219,25 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
             return;
         }
 
-        long size = processQueue.getMsgCount().get();
-        if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+        long cachedMessageCount = processQueue.getMsgCount().get();
+        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 
* 1024);
+
+        if (cachedMessageCount > 
this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+            this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+            if ((queueFlowControlTimes++ % 1000) == 0) {
+                log.warn(
+                    "the cached message count exceeds the threshold {}, so do 
flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
+                    this.defaultMQPushConsumer.getPullThresholdForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
+            }
+            return;
+        }
+
+        if (cachedMessageSizeInMiB > 
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
             this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-            if ((flowControlTimes1++ % 1000) == 0) {
+            if ((queueFlowControlTimes++ % 1000) == 0) {
                 log.warn(
-                    "the consumer message buffer is full, so do flow control, 
minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
-                    processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+                    "the cached message size exceeds the threshold {} MiB, so 
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
pullRequest={}, flowControlTimes={}",
+                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
             }
             return;
         }
@@ -233,11 +245,11 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
         if (!this.consumeOrderly) {
             if (processQueue.getMaxSpan() > 
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                 this.executePullRequestLater(pullRequest, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
-                if ((flowControlTimes2++ % 1000) == 0) {
+                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                     log.warn(
                         "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, 
flowControlTimes={}",
                         processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
-                        pullRequest, flowControlTimes2);
+                        pullRequest, queueMaxSpanFlowControlTimes);
                 }
                 return;
             }
@@ -732,6 +744,34 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                 null);
         }
 
+        // pullThresholdForTopic
+        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
+            if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 || 
this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
+                throw new MQClientException(
+                    "pullThresholdForTopic Out of range [1, 6553500]"
+                        + 
FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+            }
+        }
+
+        // pullThresholdSizeForQueue
+        if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 || 
this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
+            throw new MQClientException(
+                "pullThresholdSizeForQueue Out of range [1, 1024]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
+        }
+
+        if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
+            // pullThresholdSizeForTopic
+            if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1 
|| this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
+                throw new MQClientException(
+                    "pullThresholdSizeForTopic Out of range [1, 102400]"
+                        + 
FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                    null);
+            }
+        }
+
         // pullInterval
         if (this.defaultMQPushConsumer.getPullInterval() < 0 || 
this.defaultMQPushConsumer.getPullInterval() > 65535) {
             throw new MQClientException(
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 38b8073..e21dbc8 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -45,6 +45,7 @@ public class ProcessQueue {
     private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
     private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, 
MessageExt>();
     private final AtomicLong msgCount = new AtomicLong();
+    private final AtomicLong msgSize = new AtomicLong();
     private final Lock lockConsume = new ReentrantLock();
     private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, 
MessageExt>();
     private final AtomicLong tryUnlockTimes = new AtomicLong(0);
@@ -129,6 +130,7 @@ public class ProcessQueue {
                     if (null == old) {
                         validMsgCnt++;
                         this.queueOffsetMax = msg.getQueueOffset();
+                        msgSize.addAndGet(msg.getBody().length);
                     }
                 }
                 msgCount.addAndGet(validMsgCnt);
@@ -189,6 +191,7 @@ public class ProcessQueue {
                         MessageExt prev = 
msgTreeMap.remove(msg.getQueueOffset());
                         if (prev != null) {
                             removedCnt--;
+                            msgSize.addAndGet(0 - msg.getBody().length);
                         }
                     }
                     msgCount.addAndGet(removedCnt);
@@ -215,6 +218,10 @@ public class ProcessQueue {
         return msgCount;
     }
 
+    public AtomicLong getMsgSize() {
+        return msgSize;
+    }
+
     public boolean isDropped() {
         return dropped;
     }
@@ -250,7 +257,10 @@ public class ProcessQueue {
             this.lockTreeMap.writeLock().lockInterruptibly();
             try {
                 Long offset = this.msgTreeMapTemp.lastKey();
-                msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+                msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
+                for (MessageExt msg : this.msgTreeMapTemp.values()) {
+                    msgSize.addAndGet(0 - msg.getBody().length);
+                }
                 this.msgTreeMapTemp.clear();
                 if (offset != null) {
                     return offset + 1;
@@ -334,6 +344,7 @@ public class ProcessQueue {
                 this.msgTreeMap.clear();
                 this.msgTreeMapTemp.clear();
                 this.msgCount.set(0);
+                this.msgSize.set(0);
                 this.queueOffsetMax = 0L;
             } finally {
                 this.lockTreeMap.writeLock().unlock();
@@ -387,6 +398,7 @@ public class ProcessQueue {
                 info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
                 info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
                 info.setCachedMsgCount(this.msgTreeMap.size());
+                info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 
1024)));
             }
 
             if (!this.msgTreeMapTemp.isEmpty()) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 39e0251..e5166f3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -57,6 +57,26 @@ public class RebalancePushImpl extends RebalanceImpl {
         long newVersion = System.currentTimeMillis();
         log.info("{} Rebalance changed, also update version: {}, {}", topic, 
subscriptionData.getSubVersion(), newVersion);
         subscriptionData.setSubVersion(newVersion);
+
+        int currentQueueCount = this.processQueueTable.size();
+        if (currentQueueCount != 0) {
+            int pullThresholdForTopic = 
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
+            if (pullThresholdForTopic != -1) {
+                int newVal = Math.max(1, pullThresholdForTopic / 
currentQueueCount);
+                log.info("The pullThresholdForQueue is changed from {} to {}",
+                    
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(),
 newVal);
+                
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
+            }
+
+            int pullThresholdSizeForTopic = 
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
+            if (pullThresholdSizeForTopic != -1) {
+                int newVal = Math.max(1, pullThresholdSizeForTopic / 
currentQueueCount);
+                log.info("The pullThresholdSizeForQueue is changed from {} to 
{}",
+                    
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(),
 newVal);
+                
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
+            }
+        }
+
         // notify broker
         this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
     }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 94b3f0f..b21edc9 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
@@ -62,6 +63,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -207,6 +209,59 @@ public class DefaultMQPushConsumerTest {
         assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
     }
 
+    @Test
+    public void testCheckConfig() {
+        DefaultMQPushConsumer pushConsumer = createPushConsumer();
+
+        pushConsumer.setPullThresholdForQueue(65535 + 1);
+        try {
+            pushConsumer.start();
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("pullThresholdForQueue Out of 
range [1, 65535]");
+        }
+
+        pushConsumer = createPushConsumer();
+        pushConsumer.setPullThresholdForTopic(65535 * 100 + 1);
+
+        try {
+            pushConsumer.start();
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("pullThresholdForTopic Out of 
range [1, 6553500]");
+        }
+
+        pushConsumer = createPushConsumer();
+        pushConsumer.setPullThresholdSizeForQueue(1024 + 1);
+        try {
+            pushConsumer.start();
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out 
of range [1, 1024]");
+        }
+
+        pushConsumer = createPushConsumer();
+        pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1);
+        try {
+            pushConsumer.start();
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out 
of range [1, 102400]");
+        }
+    }
+
+    private DefaultMQPushConsumer createPushConsumer() {
+        DefaultMQPushConsumer pushConsumer = new 
DefaultMQPushConsumer(consumerGroup);
+        pushConsumer.registerMessageListener(new MessageListenerConcurrently() 
{
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
+                ConsumeConcurrentlyContext context) {
+                return null;
+            }
+        });
+        return pushConsumer;
+    }
+
     private PullRequest createPullRequest() {
         PullRequest pullRequest = new PullRequest();
         pullRequest.setConsumerGroup(consumerGroup);
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
new file mode 100644
index 0000000..d6a6dcf
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProcessQueueTest {
+
+    @Test
+    public void testCachedMessageCount() {
+        ProcessQueue pq = new ProcessQueue();
+
+        pq.putMessage(createMessageList());
+
+        assertThat(pq.getMsgCount().get()).isEqualTo(100);
+
+        pq.takeMessags(10);
+        pq.commit();
+
+        assertThat(pq.getMsgCount().get()).isEqualTo(90);
+
+        
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
+        assertThat(pq.getMsgCount().get()).isEqualTo(89);
+    }
+
+    @Test
+    public void testCachedMessageSize() {
+        ProcessQueue pq = new ProcessQueue();
+
+        pq.putMessage(createMessageList());
+
+        assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123);
+
+        pq.takeMessags(10);
+        pq.commit();
+
+        assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123);
+
+        
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
+        assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
+    }
+
+    @Test
+    public void testFillProcessQueueInfo() {
+        ProcessQueue pq = new ProcessQueue();
+        pq.putMessage(createMessageList(102400));
+
+        ProcessQueueInfo processQueueInfo = new ProcessQueueInfo();
+        pq.fillProcessQueueInfo(processQueueInfo);
+
+        assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12);
+
+        pq.takeMessags(10000);
+        pq.commit();
+        pq.fillProcessQueueInfo(processQueueInfo);
+        assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10);
+
+        pq.takeMessags(10000);
+        pq.commit();
+        pq.fillProcessQueueInfo(processQueueInfo);
+        assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9);
+
+        pq.takeMessags(80000);
+        pq.commit();
+        pq.fillProcessQueueInfo(processQueueInfo);
+        assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
+    }
+
+    private List<MessageExt> createMessageList() {
+        return createMessageList(100);
+    }
+
+    private List<MessageExt> createMessageList(int count) {
+        List<MessageExt> messageExtList = new ArrayList<MessageExt>();
+        for (int i = 0; i < count; i++) {
+            MessageExt messageExt = new MessageExt();
+            messageExt.setQueueOffset(i);
+            messageExt.setBody(new byte[123]);
+            messageExtList.add(messageExt);
+        }
+        return messageExtList;
+    }
+}
\ No newline at end of file
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
new file mode 100644
index 0000000..796a394
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RebalancePushImplTest {
+    @Spy
+    private DefaultMQPushConsumerImpl defaultMQPushConsumer = new 
DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), 
null);
+    @Mock
+    private MQClientInstance mqClientInstance;
+    @Mock
+    private OffsetStore offsetStore;
+    private String consumerGroup = "CID_RebalancePushImplTest";
+    private String topic = "TopicA";
+
+    @Test
+    public void testMessageQueueChanged_CountThreshold() {
+        RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, 
MessageModel.CLUSTERING,
+            new AllocateMessageQueueAveragely(), mqClientInstance, 
defaultMQPushConsumer);
+        init(rebalancePush);
+
+        // Just set pullThresholdForQueue
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
+        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(1024);
+
+        // Set pullThresholdForTopic
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(512);
+
+        // Change message queue allocate result
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(341);
+    }
+
+    private void doRebalanceForcibly(RebalancePushImpl rebalancePush, 
Set<MessageQueue> allocateResultSet) {
+        rebalancePush.topicSubscribeInfoTable.put(topic, allocateResultSet);
+        rebalancePush.doRebalance(false);
+        rebalancePush.messageQueueChanged(topic, allocateResultSet, 
allocateResultSet);
+    }
+
+    private void init(final RebalancePushImpl rebalancePush) {
+        rebalancePush.getSubscriptionInner().putIfAbsent(topic, new 
SubscriptionData());
+
+        rebalancePush.subscriptionInner.putIfAbsent(topic, new 
SubscriptionData());
+
+        when(mqClientInstance.findConsumerIdList(anyString(), 
anyString())).thenReturn(Collections.singletonList(consumerGroup));
+        when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
+        when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable {
+                return null;
+            }
+        
}).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
+    }
+
+    @Test
+    public void testMessageQueueChanged_SizeThreshold() {
+        RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, 
MessageModel.CLUSTERING,
+            new AllocateMessageQueueAveragely(), mqClientInstance, 
defaultMQPushConsumer);
+        init(rebalancePush);
+
+        // Just set pullThresholdSizeForQueue
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
+        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(1024);
+
+        // Set pullThresholdSizeForTopic
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(512);
+
+        // Change message queue allocate result
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(341);
+    }
+
+    @Test
+    public void testMessageQueueChanged_ConsumerRuntimeInfo() throws 
MQClientException {
+        RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup, 
MessageModel.CLUSTERING,
+            new AllocateMessageQueueAveragely(), mqClientInstance, 
defaultMQPushConsumer);
+        init(rebalancePush);
+
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
+        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+
+        defaultMQPushConsumer.setConsumeMessageService(new 
ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("1024");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("1024");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("-1");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("-1");
+
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
+        
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("512");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("512");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
+
+        // Change message queue allocate result
+        allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+        doRebalanceForcibly(rebalancePush, allocateResultSet);
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("341");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("341");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
+        
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
+    }
+}
\ No newline at end of file
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
index e2e9943..6b220b8 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
@@ -25,6 +25,7 @@ public class ProcessQueueInfo {
     private long cachedMsgMinOffset;
     private long cachedMsgMaxOffset;
     private int cachedMsgCount;
+    private int cachedMsgSizeInMiB;
 
     private long transactionMsgMinOffset;
     private long transactionMsgMaxOffset;
@@ -142,16 +143,24 @@ public class ProcessQueueInfo {
         this.lastConsumeTimestamp = lastConsumeTimestamp;
     }
 
+    public int getCachedMsgSizeInMiB() {
+        return cachedMsgSizeInMiB;
+    }
+
+    public void setCachedMsgSizeInMiB(final int cachedMsgSizeInMiB) {
+        this.cachedMsgSizeInMiB = cachedMsgSizeInMiB;
+    }
+
     @Override
     public String toString() {
         return "ProcessQueueInfo [commitOffset=" + commitOffset + ", 
cachedMsgMinOffset="
-            + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + 
cachedMsgMaxOffset + ", cachedMsgCount="
-            + cachedMsgCount + ", transactionMsgMinOffset=" + 
transactionMsgMinOffset
+            + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset
+            + ", cachedMsgCount=" + cachedMsgCount + ", cachedMsgSizeInMiB=" + 
cachedMsgSizeInMiB
+            + ", transactionMsgMinOffset=" + transactionMsgMinOffset
             + ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", 
transactionMsgCount="
             + transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" 
+ tryUnlockTimes
             + ", lastLockTimestamp=" + 
UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
             + droped + ", lastPullTimestamp=" + 
UtilAll.timeMillisToHumanString(lastPullTimestamp)
             + ", lastConsumeTimestamp=" + 
UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
-
     }
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to