This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/litePullConsumer by this push:
     new 9c3b26c  Polish lite pull consumer (#1359)
9c3b26c is described below

commit 9c3b26cfd3a7b5c7b87bb13c4ab38f249107e349
Author: King <[email protected]>
AuthorDate: Fri Aug 2 10:34:06 2019 +0800

    Polish lite pull consumer (#1359)
    
    * fix unsubscribe code
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * polish commit consumed offset
    
    * pass checkstyle
    
    * pass checkstyle
    
    * polish LiteMQPullConsumer
    
    * add flow control and polish commit logic
    
    * fix bug
    
    * polish code
    
    * fix commit consumed offset back
    
    * refactor litePullConsumer
    
    * development save
    
    * development save
    
    * Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
    
    * Polish lite pull consumer
    
    * polish lite pull consumer
    
    * polish lite pull consumer
    
    * fix seek
    
    * fix seek function
    
    * polish lite pull consumer
    
    * add apache header
    
    * add test
    
    * polish test
---
 .../client/consumer/DefaultLitePullConsumer.java   |  27 ++-
 .../client/impl/consumer/AssignedMessageQueue.java |  41 +++-
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 180 ++++++++------
 .../impl/consumer/RebalanceLitePullImpl.java       |  24 +-
 .../consumer/DefaultLitePullConsumerTest.java      | 261 +++++++++++++++++++++
 .../example/simple/LitePullConsumerTest.java       |  22 +-
 6 files changed, 448 insertions(+), 107 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 757c966..7f65713 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -32,10 +32,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.RPCHook;
 
-
 public class DefaultLitePullConsumer extends ClientConfig implements 
LitePullConsumer {
 
-    private DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
+    private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
 
     /**
      * Do the same thing for the same Group, the application must be set,and 
guarantee Globally unique
@@ -47,7 +46,6 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      */
     private long brokerSuspendMaxTimeMillis = 1000 * 20;
 
-
     /**
      * Long polling mode, the Consumer connection timeout(must greater than 
brokerSuspendMaxTimeMillis), it is not
      * recommended to modify
@@ -134,10 +132,15 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     private int pullThresholdSizeForQueue = 100;
 
     /**
-     * The socket timeout in milliseconds
+     * The poll timeout in milliseconds
      */
     private long pollTimeoutMillis = 1000 * 5;
 
+    /**
+     * Message pull delay in milliseconds
+     */
+    private long pullDelayTimeMills = 0;
+
     public DefaultLitePullConsumer() {
         this(null, MixAll.DEFAULT_CONSUMER_GROUP, null);
     }
@@ -163,7 +166,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     public DefaultLitePullConsumer(final String namespace, final String 
consumerGroup, RPCHook rpcHook) {
         this.namespace = namespace;
         this.consumerGroup = consumerGroup;
-        defaultLitePullConsumerImpl = new 
DefaultLitePullConsumerImpl(this,rpcHook);
+        defaultLitePullConsumerImpl = new DefaultLitePullConsumerImpl(this, 
rpcHook);
     }
 
     @Override
@@ -217,13 +220,13 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     }
 
     @Override
-    public Collection<MessageQueue> fetchMessageQueues(String topic) throws 
MQClientException{
+    public Collection<MessageQueue> fetchMessageQueues(String topic) throws 
MQClientException {
         return 
this.defaultLitePullConsumerImpl.fetchMessageQueues(withNamespace(topic));
     }
 
     @Override
-    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) 
throws MQClientException{
-        return 
this.defaultLitePullConsumerImpl.searchOffset(messageQueue,timestamp);
+    public Long offsetForTimestamp(MessageQueue messageQueue, Long timestamp) 
throws MQClientException {
+        return this.defaultLitePullConsumerImpl.searchOffset(messageQueue, 
timestamp);
     }
 
     @Override
@@ -393,4 +396,12 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         this.consumerTimeoutMillisWhenSuspend = 
consumerTimeoutMillisWhenSuspend;
     }
 
+    public long getPullDelayTimeMills() {
+        return pullDelayTimeMills;
+    }
+
+    public void setPullDelayTimeMills(long pullDelayTimeMills) {
+        this.pullDelayTimeMills = pullDelayTimeMills;
+    }
+
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index a3c5da1..aa8379e 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -20,6 +20,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 public class AssignedMessageQueue {
@@ -36,7 +37,7 @@ public class AssignedMessageQueue {
         this.rebalanceImpl = rebalanceImpl;
     }
 
-    public Collection<MessageQueue> messageQueues(){
+    public Collection<MessageQueue> messageQueues() {
         return assignedMessageQueueState.keySet();
     }
 
@@ -52,6 +53,7 @@ public class AssignedMessageQueue {
         for (MessageQueue messageQueue : messageQueues) {
             MessageQueueStat messageQueueStat = 
assignedMessageQueueState.get(messageQueue);
             if (assignedMessageQueueState.get(messageQueue) != null) {
+                messageQueueStat.getPausedLatch().reset();
                 messageQueueStat.setPaused(true);
             }
         }
@@ -62,6 +64,7 @@ public class AssignedMessageQueue {
             MessageQueueStat messageQueueStat = 
assignedMessageQueueState.get(messageQueue);
             if (assignedMessageQueueState.get(messageQueue) != null) {
                 messageQueueStat.setPaused(false);
+                messageQueueStat.getPausedLatch().reset();
             }
         }
     }
@@ -74,18 +77,18 @@ public class AssignedMessageQueue {
         return null;
     }
 
-    public long getNextOffset(MessageQueue messageQueue) {
+    public long getPullOffset(MessageQueue messageQueue) {
         MessageQueueStat messageQueueStat = 
assignedMessageQueueState.get(messageQueue);
         if (messageQueueStat != null) {
-            return messageQueueStat.getNextOffset();
+            return messageQueueStat.getPullOffset();
         }
         return -1;
     }
 
-    public void updateNextOffset(MessageQueue messageQueue, long offset) {
+    public void updatePullOffset(MessageQueue messageQueue, long offset) {
         MessageQueueStat messageQueueStat = 
assignedMessageQueueState.get(messageQueue);
         if (messageQueueStat != null) {
-            messageQueueStat.setNextOffset(offset);
+            messageQueueStat.setPullOffset(offset);
         }
     }
 
@@ -119,12 +122,21 @@ public class AssignedMessageQueue {
         return -1;
     }
 
+    public CountDownLatch2 getPausedLatch(MessageQueue messageQueue) {
+        MessageQueueStat messageQueueStat = 
assignedMessageQueueState.get(messageQueue);
+        if (messageQueueStat != null) {
+            return messageQueueStat.getPausedLatch();
+        }
+        return null;
+    }
+
     public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
         synchronized (this.assignedMessageQueueState) {
             Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = 
this.assignedMessageQueueState.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
                 if (!assigned.contains(next.getKey())) {
+                    next.getValue().getProcessQueue().setDropped(true);
                     it.remove();
                 }
             }
@@ -159,10 +171,11 @@ public class AssignedMessageQueue {
     public class MessageQueueStat {
         private MessageQueue messageQueue;
         private ProcessQueue processQueue;
-        private boolean paused = false;
-        private long nextOffset = -1;
-        private long consumeOffset = -1;
+        private volatile boolean paused = false;
+        private volatile long pullOffset = -1;
+        private volatile long consumeOffset = -1;
         private volatile long seekOffset = -1;
+        private CountDownLatch2 pausedLatch = new CountDownLatch2(1);
 
         public MessageQueueStat(MessageQueue messageQueue, ProcessQueue 
processQueue) {
             this.messageQueue = messageQueue;
@@ -185,12 +198,12 @@ public class AssignedMessageQueue {
             this.paused = paused;
         }
 
-        public long getNextOffset() {
-            return nextOffset;
+        public long getPullOffset() {
+            return pullOffset;
         }
 
-        public void setNextOffset(long nextOffset) {
-            this.nextOffset = nextOffset;
+        public void setPullOffset(long pullOffset) {
+            this.pullOffset = pullOffset;
         }
 
         public ProcessQueue getProcessQueue() {
@@ -216,5 +229,9 @@ public class AssignedMessageQueue {
         public void setSeekOffset(long seekOffset) {
             this.seekOffset = seekOffset;
         }
+
+        public CountDownLatch2 getPausedLatch() {
+            return pausedLatch;
+        }
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 95e218f..74cf644 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -16,27 +16,22 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.TreeMap;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
 
-import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
@@ -56,6 +51,7 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -65,7 +61,11 @@ import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -150,7 +150,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     private void checkServiceState() {
-        if (!(this.serviceState == ServiceState.RUNNING))
+        if (this.serviceState != ServiceState.RUNNING)
             throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
     }
 
@@ -347,6 +347,11 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
+
+    }
+
+    public PullAPIWrapper getPullAPIWrapper() {
+        return pullAPIWrapper;
     }
 
     private void copySubscription() throws MQClientException {
@@ -440,16 +445,24 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     public List<MessageExt> poll(long timeout) {
         try {
             checkServiceState();
+            if (timeout < 0)
+                throw new IllegalArgumentException("Timeout must not be 
negative");
+
             if (defaultLitePullConsumer.isAutoCommit()) {
                 maybeAutoCommit();
             }
             long endTime = System.currentTimeMillis() + timeout;
+
             ConsumeRequest consumeRequest = consumeRequestCache.poll(endTime - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-            while (consumeRequest != null && 
consumeRequest.getProcessQueue().isDropped()) {
-                consumeRequest = consumeRequestCache.poll(endTime - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-                if ((endTime - System.currentTimeMillis()) <= 0)
-                    break;
+
+            if (endTime - System.currentTimeMillis() > 0) {
+                while (consumeRequest != null && 
consumeRequest.getProcessQueue().isDropped()) {
+                    consumeRequest = consumeRequestCache.poll(endTime - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+                    if (endTime - System.currentTimeMillis() <= 0)
+                        break;
+                }
             }
+
             if (consumeRequest != null && 
!consumeRequest.getProcessQueue().isDropped()) {
                 List<MessageExt> messages = consumeRequest.getMessageExts();
                 long offset = 
consumeRequest.getProcessQueue().removeMessage(messages);
@@ -471,14 +484,33 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     public synchronized void seek(MessageQueue messageQueue, long offset) 
throws MQClientException {
-        if (offset < minOffset(messageQueue) || offset > 
maxOffset(messageQueue))
-            throw new MQClientException("Seek offset illegal", null);
+        if (!assignedMessageQueue.messageQueues().contains(messageQueue))
+            throw new MQClientException("The message queue is not in assigned 
list, message queue: " + messageQueue, null);
+        long minOffset = minOffset(messageQueue);
+        long maxOffset = maxOffset(messageQueue);
+        if (offset < minOffset || offset > maxOffset)
+            throw new MQClientException("Seek offset illegal, seek offset = " 
+ offset + ", min offset = " + minOffset + ", max offset = " + maxOffset, null);
         try {
+            
assignedMessageQueue.pause(Collections.singletonList(messageQueue));
+            CountDownLatch2 pausedLatch = 
assignedMessageQueue.getPausedLatch(messageQueue);
+            if (pausedLatch != null) {
+                pausedLatch.await(2, TimeUnit.SECONDS);
+            }
+            ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
+            if (processQueue != null) {
+                processQueue.clear();
+            }
+            Iterator<ConsumeRequest> iter = consumeRequestCache.iterator();
+            while (iter.hasNext()) {
+                if (iter.next().getMessageQueue().equals(messageQueue))
+                    iter.remove();
+            }
             assignedMessageQueue.setSeekOffset(messageQueue, offset);
-            updateConsumeOffset(messageQueue, offset);
-            updateConsumeOffsetToBroker(messageQueue, offset, false);
+            assignedMessageQueue.updateConsumeOffset(messageQueue, offset);
         } catch (Exception e) {
             log.error("Seek offset failed.", e);
+        } finally {
+            
assignedMessageQueue.resume(Collections.singletonList(messageQueue));
         }
     }
 
@@ -545,7 +577,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     private void updatePullOffset(MessageQueue remoteQueue, long 
nextPullOffset) {
         if (assignedMessageQueue.getSeekOffset(remoteQueue) == -1) {
-            assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
+            assignedMessageQueue.updatePullOffset(remoteQueue, nextPullOffset);
         }
     }
 
@@ -568,12 +600,12 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         if (seekOffset != -1) {
             offset = seekOffset;
             assignedMessageQueue.setSeekOffset(remoteQueue, -1);
-            assignedMessageQueue.updateNextOffset(remoteQueue,offset);
+            assignedMessageQueue.updatePullOffset(remoteQueue, offset);
         } else {
-            offset = assignedMessageQueue.getNextOffset(remoteQueue);
+            offset = assignedMessageQueue.getPullOffset(remoteQueue);
             if (offset == -1) {
                 offset = fetchConsumeOffset(remoteQueue, false);
-                assignedMessageQueue.updateNextOffset(remoteQueue, offset);
+                assignedMessageQueue.updatePullOffset(remoteQueue, offset);
                 assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
             }
         }
@@ -596,78 +628,82 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
         @Override
         public void run() {
-            ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
 
-            if (processQueue == null && processQueue.isDropped()) {
-                log.info("the message queue not be able to poll, because it's 
dropped. group={}, messageQueue={}", 
defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
-                return;
-            }
+            if (!this.isCancelled()) {
 
-            if (consumeRequestCache.size() * 
defaultLitePullConsumer.getPullBatchNums() > 
defaultLitePullConsumer.getPullThresholdForAll()) {
-                scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((consumeRequestFlowControlTimes++ % 1000) == 0)
-                    log.warn("the consume request count exceeds threshold {}, 
so do flow control, consume request count={}, flowControlTimes={}", 
consumeRequestCache.size(), consumeRequestFlowControlTimes);
-                return;
-            }
+                if (assignedMessageQueue.isPaused(messageQueue)) {
+                    CountDownLatch2 pasuedLatch = 
assignedMessageQueue.getPausedLatch(messageQueue);
+                    if (pasuedLatch != null)
+                        pasuedLatch.countDown();
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
+                    log.debug("Message Queue: {} has been paused!", 
messageQueue);
+                    return;
+                }
 
-            long cachedMessageCount = processQueue.getMsgCount().get();
-            long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 
(1024 * 1024);
+                ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
 
-            if (cachedMessageCount > 
defaultLitePullConsumer.getPullThresholdForQueue()) {
-                scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((queueFlowControlTimes++ % 1000) == 0) {
-                    log.warn(
-                        "the cached message count exceeds the threshold {}, so 
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
flowControlTimes={}",
-                        defaultLitePullConsumer.getPullThresholdForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, queueFlowControlTimes);
+                if (processQueue == null && processQueue.isDropped()) {
+                    log.info("the message queue not be able to poll, because 
it's dropped. group={}, messageQueue={}", 
defaultLitePullConsumer.getConsumerGroup(), this.messageQueue);
+                    return;
                 }
-                return;
-            }
 
-            if (cachedMessageSizeInMiB > 
defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
-                scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((queueFlowControlTimes++ % 1000) == 0) {
-                    log.warn(
-                        "the cached message size exceeds the threshold {} MiB, 
so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
flowControlTimes={}",
-                        
defaultLitePullConsumer.getPullThresholdSizeForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, queueFlowControlTimes);
+                if (consumeRequestCache.size() * 
defaultLitePullConsumer.getPullBatchNums() > 
defaultLitePullConsumer.getPullThresholdForAll()) {
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((consumeRequestFlowControlTimes++ % 1000) == 0)
+                        log.warn("the consume request count exceeds threshold 
{}, so do flow control, consume request count={}, flowControlTimes={}", 
consumeRequestCache.size(), consumeRequestFlowControlTimes);
+                    return;
                 }
-                return;
-            }
 
-            if (processQueue.getMaxSpan() > 
defaultLitePullConsumer.getConsumeMaxSpan()) {
-                scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
-                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
-                    log.warn(
-                        "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
-                        processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), 
queueMaxSpanFlowControlTimes);
+                long cachedMessageCount = processQueue.getMsgCount().get();
+                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() 
/ (1024 * 1024);
+
+                if (cachedMessageCount > 
defaultLitePullConsumer.getPullThresholdForQueue()) {
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((queueFlowControlTimes++ % 1000) == 0) {
+                        log.warn(
+                            "the cached message count exceeds the threshold 
{}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
flowControlTimes={}",
+                            
defaultLitePullConsumer.getPullThresholdForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, queueFlowControlTimes);
+                    }
+                    return;
                 }
-                return;
-            }
 
-            if (!this.isCancelled()) {
-                if (assignedMessageQueue.isPaused(messageQueue)) {
-                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_PAUSE, TimeUnit.MILLISECONDS);
-                    log.debug("Message Queue: {} has been paused!", 
messageQueue);
+                if (cachedMessageSizeInMiB > 
defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((queueFlowControlTimes++ % 1000) == 0) {
+                        log.warn(
+                            "the cached message size exceeds the threshold {} 
MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, 
flowControlTimes={}",
+                            
defaultLitePullConsumer.getPullThresholdSizeForQueue(), 
processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, 
cachedMessageSizeInMiB, queueFlowControlTimes);
+                    }
                     return;
                 }
+
+                if (processQueue.getMaxSpan() > 
defaultLitePullConsumer.getConsumeMaxSpan()) {
+                    scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
+                        log.warn(
+                            "the queue's messages, span too long, so do flow 
control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
+                            processQueue.getMsgTreeMap().firstKey(), 
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), 
queueMaxSpanFlowControlTimes);
+                    }
+                    return;
+                }
+
                 String subExpression = null;
                 if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                     String topic = this.messageQueue.getTopic();
                     subExpression = 
rebalanceImpl.getSubscriptionInner().get(topic).getSubString();
                 }
                 long offset = nextPullOffset(messageQueue);
-                long pullDelayTimeMills = 0;
+                long pullDelayTimeMills = 
defaultLitePullConsumer.getPullDelayTimeMills();
                 try {
                     PullResult pullResult = pull(messageQueue, subExpression, 
offset, nextPullBatchNums());
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
-                            
processQueue.putMessage(pullResult.getMsgFoundList());
-                            submitConsumeRequest(new 
ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
-                            pullDelayTimeMills = 0;
+                            if (pullResult.getMsgFoundList() != null && 
!pullResult.getMsgFoundList().isEmpty()) {
+                                
processQueue.putMessage(pullResult.getMsgFoundList());
+                                submitConsumeRequest(new 
ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
+                            }
                             break;
-                        case NO_NEW_MSG:
-                            pullDelayTimeMills = 100;
                         case OFFSET_ILLEGAL:
-                            //TODO
                             log.warn("the pull request offset illegal, {}", 
pullResult.toString());
                             break;
                         default:
@@ -1037,7 +1073,6 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         private final List<MessageExt> messageExts;
         private final MessageQueue messageQueue;
         private final ProcessQueue processQueue;
-        private long startConsumeTimeMillis;
 
         public ConsumeRequest(final List<MessageExt> messageExts, final 
MessageQueue messageQueue,
             final ProcessQueue processQueue) {
@@ -1058,12 +1093,5 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
             return processQueue;
         }
 
-        public long getStartConsumeTimeMillis() {
-            return startConsumeTimeMillis;
-        }
-
-        public void setStartConsumeTimeMillis(final long 
startConsumeTimeMillis) {
-            this.startConsumeTimeMillis = startConsumeTimeMillis;
-        }
     }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 8148c7d..0b8ec67 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.client.impl.consumer;
 
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
@@ -10,7 +26,7 @@ import 
org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import java.util.List;
 import java.util.Set;
 
-public class RebalanceLitePullImpl extends RebalanceImpl  {
+public class RebalanceLitePullImpl extends RebalanceImpl {
 
     private final DefaultLitePullConsumerImpl litePullConsumerImpl;
 
@@ -19,8 +35,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl  {
     }
 
     public RebalanceLitePullImpl(String consumerGroup, MessageModel 
messageModel,
-                                 AllocateMessageQueueStrategy 
allocateMessageQueueStrategy,
-                                 MQClientInstance mQClientFactory, 
DefaultLitePullConsumerImpl litePullConsumerImpl) {
+        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+        MQClientInstance mQClientFactory, DefaultLitePullConsumerImpl 
litePullConsumerImpl) {
         super(consumerGroup, messageModel, allocateMessageQueueStrategy, 
mQClientFactory);
         this.litePullConsumerImpl = litePullConsumerImpl;
     }
@@ -37,7 +53,6 @@ public class RebalanceLitePullImpl extends RebalanceImpl  {
         }
     }
 
-
     @Override
     public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue 
pq) {
         this.litePullConsumerImpl.getOffsetStore().persist(mq);
@@ -64,5 +79,4 @@ public class RebalanceLitePullImpl extends RebalanceImpl  {
     public void dispatchPullRequest(List<PullRequest> pullRequestList) {
     }
 
-
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
new file mode 100644
index 0000000..68144c7
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
+import org.apache.rocketmq.client.impl.consumer.RebalanceService;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultLitePullConsumerTest {
+    @Spy
+    private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+
+    @Mock
+    private MQClientAPIImpl mQClientAPIImpl;
+    @Mock
+    private MQAdminImpl mQAdminImpl;
+
+    private RebalanceImpl rebalanceImpl;
+    private OffsetStore offsetStore;
+    private DefaultLitePullConsumer litePullConsumer;
+    private DefaultLitePullConsumerImpl litePullConsumerImpl;
+    private String consumerGroup = "LitePullConsumerGroup";
+    private String topic = "LitePullConsumerTest";
+    private String brokerName = "BrokerA";
+
+    @Before
+    public void init() throws Exception {
+        String groupName = consumerGroup + System.currentTimeMillis();
+        litePullConsumer = new DefaultLitePullConsumer(groupName);
+        litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
+
+        Field field = 
MQClientInstance.class.getDeclaredField("rebalanceService");
+        field.setAccessible(true);
+        RebalanceService rebalanceService = (RebalanceService) 
field.get(mQClientFactory);
+        field = RebalanceService.class.getDeclaredField("waitInterval");
+        field.setAccessible(true);
+        field.set(rebalanceService, 100);
+
+        litePullConsumer.start();
+
+        field = 
DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
+        field.setAccessible(true);
+        litePullConsumerImpl = (DefaultLitePullConsumerImpl) 
field.get(litePullConsumer);
+        field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(litePullConsumerImpl, mQClientFactory);
+
+        PullAPIWrapper pullAPIWrapper = 
litePullConsumerImpl.getPullAPIWrapper();
+        field = PullAPIWrapper.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(pullAPIWrapper, mQClientFactory);
+
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQClientAPIImpl);
+
+        field = MQClientInstance.class.getDeclaredField("mQAdminImpl");
+        field.setAccessible(true);
+        field.set(mQClientFactory, mQAdminImpl);
+
+        field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("rebalanceImpl");
+        field.setAccessible(true);
+        rebalanceImpl = (RebalanceImpl) field.get(litePullConsumerImpl);
+        field = RebalanceImpl.class.getDeclaredField("mQClientFactory");
+        field.setAccessible(true);
+        field.set(rebalanceImpl, mQClientFactory);
+
+        offsetStore = spy(litePullConsumerImpl.getOffsetStore());
+        field = 
DefaultLitePullConsumerImpl.class.getDeclaredField("offsetStore");
+        field.setAccessible(true);
+        field.set(litePullConsumerImpl, offsetStore);
+
+        when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), 
any(PullMessageRequestHeader.class),
+            anyLong(), any(CommunicationMode.class), 
nullable(PullCallback.class)))
+            .thenAnswer(new Answer<Object>() {
+                @Override
+                public Object answer(InvocationOnMock mock) throws Throwable {
+                    PullMessageRequestHeader requestHeader = 
mock.getArgument(1);
+                    MessageClientExt messageClientExt = new MessageClientExt();
+                    messageClientExt.setTopic(topic);
+                    messageClientExt.setQueueId(0);
+                    messageClientExt.setMsgId("123");
+                    messageClientExt.setBody(new byte[] {'a'});
+                    messageClientExt.setOffsetMsgId("234");
+                    messageClientExt.setBornHost(new InetSocketAddress(8080));
+                    messageClientExt.setStoreHost(new InetSocketAddress(8080));
+                    PullResult pullResult = createPullResult(requestHeader, 
PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+                    return pullResult;
+                }
+            });
+
+        when(mQClientFactory.findBrokerAddressInSubscribe(anyString(), 
anyLong(), anyBoolean())).thenReturn(new FindBrokerResult("127.0.0.1:10911", 
false));
+
+        
doReturn(Collections.singletonList(mQClientFactory.getClientId())).when(mQClientFactory).findConsumerIdList(anyString(),
 anyString());
+
+        doReturn(123L).when(offsetStore).readOffset(any(MessageQueue.class), 
any(ReadOffsetType.class));
+    }
+
+    @After
+    public void terminate() {
+        litePullConsumer.shutdown();
+    }
+
+    @Test
+    public void testAssign_PollMessageSuccess() {
+        MessageQueue messageQueue = createMessageQueue();
+        litePullConsumer.setPullDelayTimeMills(60 * 1000);
+        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        List<MessageExt> result = litePullConsumer.poll();
+        assertThat(result.get(0).getTopic()).isEqualTo(topic);
+        assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+    }
+
+    @Test
+    public void testSubscribe_PollMessageSuccess() throws MQClientException {
+        litePullConsumer.setPullDelayTimeMills(60 * 1000);
+        litePullConsumer.subscribe(topic, "*");
+        Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
+        messageQueueSet.add(createMessageQueue());
+        litePullConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+        litePullConsumer.setPollTimeoutMillis(20 * 1000);
+        List<MessageExt> result = litePullConsumer.poll();
+        assertThat(result.get(0).getTopic()).isEqualTo(topic);
+        assertThat(result.get(0).getBody()).isEqualTo(new byte[] {'a'});
+    }
+
+    @Test
+    public void testSubscriptionType_AssignAndSubscribeExclusive() throws 
MQClientException {
+        try {
+            litePullConsumer.subscribe(topic, "*");
+            
litePullConsumer.assign(Collections.singletonList(createMessageQueue()));
+            failBecauseExceptionWasNotThrown(IllegalStateException.class);
+        } catch (IllegalStateException e) {
+            assertThat(e).hasMessageContaining("Cannot select two subscription 
types at the same time.");
+        }
+    }
+
+    @Test
+    public void testFetchMesseageQueues_FetchMessageQueuesBeforeStart() throws 
MQClientException {
+        try {
+            DefaultLitePullConsumer litePullConsumer = 
createLitePullConsumer();
+            litePullConsumer.fetchMessageQueues(topic);
+            failBecauseExceptionWasNotThrown(IllegalStateException.class);
+        } catch (IllegalStateException e) {
+            assertThat(e).hasMessageContaining("The consumer not running.");
+        }
+    }
+
+    @Test
+    public void testSeek_SeekOffsetIllegal() throws MQClientException {
+        when(mQAdminImpl.minOffset(any(MessageQueue.class))).thenReturn(0L);
+        when(mQAdminImpl.maxOffset(any(MessageQueue.class))).thenReturn(100L);
+        MessageQueue messageQueue = createMessageQueue();
+        litePullConsumer.assign(Collections.singletonList(messageQueue));
+        try {
+            litePullConsumer.seek(messageQueue, -1);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("min offset = 0");
+        }
+
+        try {
+            litePullConsumer.seek(messageQueue, 1000);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("max offset = 100");
+        }
+    }
+
+    @Test
+    public void testSeek_MessageQueueNotInAssignList() {
+        try {
+            litePullConsumer.seek(createMessageQueue(), 0);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("The message queue is not in 
assigned list");
+        }
+    }
+
+    private MessageQueue createMessageQueue() {
+        MessageQueue messageQueue = new MessageQueue();
+        messageQueue.setBrokerName(brokerName);
+        messageQueue.setQueueId(0);
+        messageQueue.setTopic(topic);
+        return messageQueue;
+    }
+
+    private DefaultLitePullConsumer createLitePullConsumer() {
+        DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
+        return litePullConsumer;
+    }
+
+    private PullResultExt createPullResult(PullMessageRequestHeader 
requestHeader, PullStatus pullStatus,
+        List<MessageExt> messageExtList) throws Exception {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        for (MessageExt messageExt : messageExtList) {
+            outputStream.write(MessageDecoder.encode(messageExt, false));
+        }
+        return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + 
messageExtList.size(), 123, 2048, messageExtList, 0, 
outputStream.toByteArray());
+    }
+}
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
 
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 488a499..0430465 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -16,23 +16,33 @@
  */
 package org.apache.rocketmq.example.simple;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.common.message.MessageExt;
-
+import org.apache.rocketmq.common.message.MessageQueue;
 
 public class LitePullConsumerTest {
     public static void main(String[] args) throws Exception {
         DefaultLitePullConsumer litePullConsumer = new 
DefaultLitePullConsumer("test");
-        litePullConsumer.setNamesrvAddr("localhost:9876");
-        litePullConsumer.setAutoCommit(true);
-        litePullConsumer.subscribe("test41","TagA" );
+        litePullConsumer.setAutoCommit(false);
         litePullConsumer.start();
+        Collection<MessageQueue> mqSet = 
litePullConsumer.fetchMessageQueues("test400");
+        List<MessageQueue> list = new ArrayList<>(mqSet);
+        Collection<MessageQueue> assginMq = 
Collections.singletonList(list.get(0));
+        litePullConsumer.assign(assginMq);
+        int size = 0;
+        litePullConsumer.seek(list.get(0), 26);
 
-        int i = 0;
         while (true) {
             List<MessageExt> messageExts = litePullConsumer.poll();
-            System.out.printf("%s%n", messageExts);
+            if (messageExts != null) {
+                size += messageExts.size();
+            }
+            litePullConsumer.commitSync();
+            System.out.printf("%s %d %n", messageExts, size);
         }
 
     }

Reply via email to