This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new bc5adc5  refactor(rebalance): use smart_ptr to manage pull request 
(#206)
bc5adc5 is described below

commit bc5adc57eda7145486b0a8851f65daab2530d9bb
Author: dinglei <[email protected]>
AuthorDate: Wed Dec 11 15:45:26 2019 +0800

    refactor(rebalance): use smart_ptr to manage pull request (#206)
    
    refactor(rebalance): use smart_ptr to manage pull request
---
 include/DefaultMQPullConsumer.h                    |  15 +-
 include/DefaultMQPushConsumer.h                    |  14 +-
 include/MQConsumer.h                               |   2 +-
 src/MQClientFactory.cpp                            |   8 +-
 src/consumer/AllocateMQStrategy.h                  |   2 +-
 src/consumer/ConsumeMessageConcurrentlyService.cpp |  33 +-
 src/consumer/ConsumeMessageOrderlyService.cpp      |  33 +-
 src/consumer/ConsumeMsgService.h                   |  16 +-
 src/consumer/DefaultMQPullConsumer.cpp             |   6 +-
 src/consumer/DefaultMQPushConsumer.cpp             | 458 ++++++++++++---------
 src/consumer/FindBrokerResult.h                    |   2 +-
 src/consumer/OffsetStore.cpp                       |   4 +-
 src/consumer/OffsetStore.h                         |   2 +-
 src/consumer/PullAPIWrapper.cpp                    |   2 +-
 src/consumer/PullAPIWrapper.h                      |   2 +-
 src/consumer/PullRequest.cpp                       |  58 +--
 src/consumer/PullRequest.h                         |  17 +-
 src/consumer/PullResult.cpp                        |   2 +-
 src/consumer/PullResultExt.h                       |   2 +-
 src/consumer/Rebalance.cpp                         | 298 +++++++-------
 src/consumer/Rebalance.h                           |  29 +-
 src/consumer/SubscriptionData.cpp                  |   4 +-
 src/consumer/SubscriptionData.h                    |   2 +-
 23 files changed, 588 insertions(+), 423 deletions(-)

diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 4f6ef92..af01941 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -54,7 +54,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public 
MQConsumer {
   virtual void getSubscriptions(std::vector<SubscriptionData>&);
   virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
   virtual void removeConsumeOffset(const MQMessageQueue& mq);
-  virtual void producePullMsgTask(PullRequest*);
+  virtual bool producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest);
   virtual Rebalance* getRebalance() const;
   //<!end MQConsumer;
 
@@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public 
MQConsumer {
    * @param subExpression
    *            set filter expression for pulled msg, broker will filter msg 
actively
    *            Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
-   *            if subExpression is setted to "null" or "*"��all msg will be 
subscribed
+   *            if subExpression is setted to "null" or "*", all msg will be 
subscribed
    * @param offset
    *            specify the started pull offset
    * @param maxNums
@@ -90,7 +90,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public 
MQConsumer {
    * @param subExpression
    *            set filter expression for pulled msg, broker will filter msg 
actively
    *            Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
-   *            if subExpression is setted to "null" or "*"��all msg will be 
subscribed
+   *            if subExpression is setted to "null" or "*", all msg will be 
subscribed
    * @param offset
    *            specify the started pull offset
    * @param maxNums
@@ -107,20 +107,13 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public 
MQConsumer {
 
   virtual ConsumerRunningInfo* getConsumerRunningInfo() { return NULL; }
   /**
-   * ��ȡ���ѽ��ȣ�����-1��ʾ����
    *
    * @param mq
    * @param fromStore
    * @return
    */
   int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);
-  /**
-   * ����topic��ȡMessageQueue���Ծ��ⷽʽ�����ڶ����Ա֮�����
-   *
-   * @param topic
-   *            ��ϢTopic
-   * @return ���ض��м���
-   */
+
   void fetchMessageQueuesInBalance(const std::string& topic, 
std::vector<MQMessageQueue> mqs);
 
   // temp persist consumer offset interface, only valid with
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 61fcde1..b6de085 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -86,13 +86,17 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public 
MQConsumer {
   virtual Rebalance* getRebalance() const;
   ConsumeMsgService* getConsumerMsgService() const;
 
-  virtual void producePullMsgTask(PullRequest*);
-  void triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest* 
request);
+  virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>);
+  virtual bool producePullMsgTaskLater(boost::weak_ptr<PullRequest>, int 
millis);
+  static void static_triggerNextPullRequest(void* context,
+                                            boost::asio::deadline_timer* t,
+                                            boost::weak_ptr<PullRequest>);
+  void triggerNextPullRequest(boost::asio::deadline_timer* t, 
boost::weak_ptr<PullRequest>);
   void runPullMsgQueue(TaskQueue* pTaskQueue);
-  void pullMessage(PullRequest* pullrequest);       // sync pullMsg
-  void pullMessageAsync(PullRequest* pullrequest);  // async pullMsg
+  void pullMessage(boost::weak_ptr<PullRequest> pullrequest);       // sync 
pullMsg
+  void pullMessageAsync(boost::weak_ptr<PullRequest> pullrequest);  // async 
pullMsg
   void setAsyncPull(bool asyncFlag);
-  AsyncPullCallback* getAsyncPullCallBack(PullRequest* request, MQMessageQueue 
msgQueue);
+  AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr<PullRequest>, 
MQMessageQueue msgQueue);
   void shutdownAsyncPullCallBack();
 
   /*
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index cc25327..b6cd613 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient {
   virtual ConsumeType getConsumeType() = 0;
   virtual ConsumeFromWhere getConsumeFromWhere() = 0;
   virtual void getSubscriptions(std::vector<SubscriptionData>&) = 0;
-  virtual void producePullMsgTask(PullRequest*) = 0;
+  virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>) = 0;
   virtual Rebalance* getRebalance() const = 0;
   virtual PullResult pull(const MQMessageQueue& mq, const std::string& 
subExpression, int64 offset, int maxNums) = 0;
   virtual void pull(const MQMessageQueue& mq,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 99c4ffd..03d4640 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1040,10 +1040,12 @@ void MQClientFactory::resetOffset(const string& group,
 
     for (; it != offsetTable.end(); ++it) {
       MQMessageQueue mq = it->first;
-      PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
+      boost::weak_ptr<PullRequest> pullRequest = 
pConsumer->getRebalance()->getPullRequest(mq);
+      boost::shared_ptr<PullRequest> pullreq = pullRequest.lock();
+      // PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
       if (pullreq) {
-        pullreq->setDroped(true);
-        LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
+        pullreq->setDropped(true);
+        LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data());
         pullreq->clearAllMsgs();
         pullreq->updateQueueMaxOffset(it->second);
       } else {
diff --git a/src/consumer/AllocateMQStrategy.h 
b/src/consumer/AllocateMQStrategy.h
index 4613040..e24966c 100644
--- a/src/consumer/AllocateMQStrategy.h
+++ b/src/consumer/AllocateMQStrategy.h
@@ -92,5 +92,5 @@ class AllocateMQAveragely : public AllocateMQStrategy {
 };
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 #endif
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp 
b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 9069350..e5df16e 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -60,19 +60,35 @@ MessageListenerType 
ConsumeMessageConcurrentlyService::getConsumeMsgSerivceListe
   return m_pMessageListener->getMessageListenerType();
 }
 
-void ConsumeMessageConcurrentlyService::submitConsumeRequest(PullRequest* 
request, vector<MQMessageExt>& msgs) {
+void 
ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<PullRequest>
 pullRequest,
+                                                             
vector<MQMessageExt>& msgs) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
+  if (request->isDropped()) {
+    LOG_INFO("Pull request for %s is dropped, which will be released in next 
re-balance.",
+             request->m_messageQueue.toString().c_str());
+    return;
+  }
   
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
 this, request, msgs));
 }
 
-void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, 
vector<MQMessageExt>& msgs) {
-  if (!request || request->isDroped()) {
-    LOG_WARN("the pull result is NULL or Had been dropped");
+void 
ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest> 
pullRequest,
+                                                       vector<MQMessageExt>& 
msgs) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
+  if (!request || request->isDropped()) {
+    LOG_WARN("the pull request had been dropped");
     request->clearAllMsgs();  // add clear operation to avoid bad state when
                               // dropped pullRequest returns normal
     return;
   }
 
-  //<!��ȡ����;
   if (msgs.empty()) {
     LOG_WARN("the msg of pull result is NULL,its mq:%s", 
(request->m_messageQueue).toString().c_str());
     return;
@@ -123,12 +139,11 @@ void 
ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vec
 
   // update offset
   int64 offset = request->removeMessage(msgs);
-  // LOG_DEBUG("update offset:%lld of mq: %s",
-  // offset,(request->m_messageQueue).toString().c_str());
   if (offset >= 0) {
     m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
   } else {
-    LOG_WARN("Note: accumulation consume occurs on mq:%s", 
(request->m_messageQueue).toString().c_str());
+    LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated 
before. skip..",
+             (request->m_messageQueue).toString().c_str());
   }
 }
 
@@ -144,4 +159,4 @@ void 
ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& ms
 }
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp 
b/src/consumer/ConsumeMessageOrderlyService.cpp
index 800bb4d..f68fe44 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -103,14 +103,25 @@ MessageListenerType 
ConsumeMessageOrderlyService::getConsumeMsgSerivceListenerTy
   return m_pMessageListener->getMessageListenerType();
 }
 
-void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request, 
vector<MQMessageExt>& msgs) {
+void 
ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> 
pullRequest,
+                                                        vector<MQMessageExt>& 
msgs) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
   m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest, 
this, request));
 }
 
 void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* 
context,
-                                                                    
PullRequest* request,
+                                                                    
boost::weak_ptr<PullRequest> pullRequest,
                                                                     bool 
tryLockMQ,
                                                                     
boost::asio::deadline_timer* t) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
   LOG_INFO("submit consumeRequest later for mq:%s", 
request->m_messageQueue.toString().c_str());
   vector<MQMessageExt> msgs;
   ConsumeMessageOrderlyService* orderlyService = 
(ConsumeMessageOrderlyService*)context;
@@ -122,7 +133,12 @@ void 
ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex
     deleteAndZero(t);
 }
 
-void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
+void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> 
pullRequest) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
   bool bGetMutex = false;
   boost::unique_lock<boost::timed_mutex> 
lock(request->getPullRequestCriticalSection(), boost::try_to_lock);
   if (!lock.owns_lock()) {
@@ -140,7 +156,7 @@ void 
ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
     // request->m_messageQueue.toString().c_str());
     return;
   }
-  if (!request || request->isDroped()) {
+  if (!request || request->isDropped()) {
     LOG_WARN("the pull result is NULL or Had been dropped");
     request->clearAllMsgs();  // add clear operation to avoid bad state when
                               // dropped pullRequest returns normal
@@ -189,11 +205,16 @@ void 
ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
     }
   }
 }
-void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest* 
request, bool tryLockMQ) {
+void 
ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest>
 pullRequest, bool tryLockMQ) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released");
+    return;
+  }
   int retryTimer = tryLockMQ ? 500 : 100;
   boost::asio::deadline_timer* t =
       new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(retryTimer));
   t->async_wait(
       
boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater), 
this, request, tryLockMQ, t));
 }
-}
+}  // namespace rocketmq
diff --git a/src/consumer/ConsumeMsgService.h b/src/consumer/ConsumeMsgService.h
index 2bb7979..a5d3ce7 100644
--- a/src/consumer/ConsumeMsgService.h
+++ b/src/consumer/ConsumeMsgService.h
@@ -38,7 +38,7 @@ class ConsumeMsgService {
   virtual void start() {}
   virtual void shutdown() {}
   virtual void stopThreadPool() {}
-  virtual void submitConsumeRequest(PullRequest* request, 
vector<MQMessageExt>& msgs) {}
+  virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, 
vector<MQMessageExt>& msgs) {}
   virtual MessageListenerType getConsumeMsgSerivceListenerType() { return 
messageListenerDefaultly; }
 };
 
@@ -48,11 +48,11 @@ class ConsumeMessageConcurrentlyService : public 
ConsumeMsgService {
   virtual ~ConsumeMessageConcurrentlyService();
   virtual void start();
   virtual void shutdown();
-  virtual void submitConsumeRequest(PullRequest* request, 
vector<MQMessageExt>& msgs);
+  virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, 
vector<MQMessageExt>& msgs);
   virtual MessageListenerType getConsumeMsgSerivceListenerType();
   virtual void stopThreadPool();
 
-  void ConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
+  void ConsumeRequest(boost::weak_ptr<PullRequest> request, 
vector<MQMessageExt>& msgs);
 
  private:
   void resetRetryTopic(vector<MQMessageExt>& msgs);
@@ -71,17 +71,17 @@ class ConsumeMessageOrderlyService : public 
ConsumeMsgService {
   virtual ~ConsumeMessageOrderlyService();
   virtual void start();
   virtual void shutdown();
-  virtual void submitConsumeRequest(PullRequest* request, 
vector<MQMessageExt>& msgs);
+  virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request, 
vector<MQMessageExt>& msgs);
   virtual void stopThreadPool();
   virtual MessageListenerType getConsumeMsgSerivceListenerType();
 
   void boost_asio_work();
-  void tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ);
+  void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool 
tryLockMQ);
   static void static_submitConsumeRequestLater(void* context,
-                                               PullRequest* request,
+                                               boost::weak_ptr<PullRequest> 
request,
                                                bool tryLockMQ,
                                                boost::asio::deadline_timer* t);
-  void ConsumeRequest(PullRequest* request);
+  void ConsumeRequest(boost::weak_ptr<PullRequest> request);
   void lockMQPeriodically(boost::system::error_code& ec, 
boost::asio::deadline_timer* t);
   void unlockAllMQ();
   bool lockOneMQ(const MQMessageQueue& mq);
@@ -99,6 +99,6 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService 
{
 };
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif  //<! _CONSUMEMESSAGESERVICE_H_
diff --git a/src/consumer/DefaultMQPullConsumer.cpp 
b/src/consumer/DefaultMQPullConsumer.cpp
index 2175feb..d3ce978 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -369,11 +369,13 @@ void 
DefaultMQPullConsumer::getSubscriptions(vector<SubscriptionData>& result) {
   }
 }
 
-void DefaultMQPullConsumer::producePullMsgTask(PullRequest*) {}
+bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> 
pullRequest) {
+  return true;
+}
 
 Rebalance* DefaultMQPullConsumer::getRebalance() const {
   return NULL;
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumer.cpp 
b/src/consumer/DefaultMQPushConsumer.cpp
index 7e47e15..0cd5427 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -23,8 +23,6 @@
 #include "Logging.h"
 #include "MQClientAPIImpl.h"
 #include "MQClientFactory.h"
-#include "MQClientManager.h"
-#include "MQProtos.h"
 #include "OffsetStore.h"
 #include "PullAPIWrapper.h"
 #include "PullSysFlag.h"
@@ -37,150 +35,157 @@ namespace rocketmq {
 
 class AsyncPullCallback : public PullCallback {
  public:
-  AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)
+  AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, 
boost::weak_ptr<PullRequest> request)
       : m_callbackOwner(pushConsumer), m_pullRequest(request), 
m_bShutdown(false) {}
 
-  virtual ~AsyncPullCallback() {
-    m_callbackOwner = NULL;
-    m_pullRequest = NULL;
-  }
+  virtual ~AsyncPullCallback() { m_callbackOwner = NULL; }
 
   virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool 
bProducePullRequest) {
-    if (m_bShutdown == true) {
-      LOG_INFO("pullrequest for:%s in shutdown, return", 
(m_pullRequest->m_messageQueue).toString().c_str());
-      m_pullRequest->removePullMsgEvent();
+    boost::shared_ptr<PullRequest> pullRequest = m_pullRequest.lock();
+    if (!pullRequest) {
+      LOG_WARN("Pull request for[%s] has been released", 
mq.toString().c_str());
       return;
     }
 
+    if (m_bShutdown) {
+      LOG_INFO("pullrequest for:%s in shutdown, return", 
(pullRequest->m_messageQueue).toString().c_str());
+      return;
+    }
+    if (pullRequest->isDropped()) {
+      LOG_INFO("Pull request for queue[%s] has been set as dropped. Will NOT 
pull this queue any more",
+               pullRequest->m_messageQueue.toString().c_str());
+      return;
+    }
     switch (result.pullStatus) {
       case FOUND: {
-        if (!m_pullRequest->isDroped())  // if request is setted to dropped,
-                                         // don't add msgFoundList to
-                                         // m_msgTreeMap and don't call
-                                         // producePullMsgTask
-        {                                // avoid issue: pullMsg is sent out, 
rebalance is doing concurrently
-          // and this request is dropped, and then received pulled msgs.
-          m_pullRequest->setNextOffset(result.nextBeginOffset);
-          m_pullRequest->putMessage(result.msgFoundList);
-
-          
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(m_pullRequest, 
result.msgFoundList);
-
-          if (bProducePullRequest)
-            m_callbackOwner->producePullMsgTask(m_pullRequest);
-          else
-            m_pullRequest->removePullMsgEvent();
-
-          LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
-                    (m_pullRequest->m_messageQueue).toString().c_str(), 
result.msgFoundList.size(),
-                    result.nextBeginOffset);
+        if (pullRequest->isDropped()) {
+          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
(pullRequest->m_messageQueue).toString().c_str());
+          break;
+        }
+        pullRequest->setNextOffset(result.nextBeginOffset);
+        pullRequest->putMessage(result.msgFoundList);
+
+        
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, 
result.msgFoundList);
+
+        if (bProducePullRequest) {
+          m_callbackOwner->producePullMsgTask(pullRequest);
         } else {
-          LOG_INFO("remove pullmsg event of mq:%s", 
(m_pullRequest->m_messageQueue).toString().c_str());
-          m_pullRequest->removePullMsgEvent();
+          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+                   (pullRequest->m_messageQueue).toString().c_str());
         }
+
+        LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
+                  (pullRequest->m_messageQueue).toString().c_str(), 
result.msgFoundList.size(), result.nextBeginOffset);
+
         break;
       }
       case NO_NEW_MSG: {
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
+        if (pullRequest->isDropped()) {
+          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
(pullRequest->m_messageQueue).toString().c_str());
+          break;
+        }
+        pullRequest->setNextOffset(result.nextBeginOffset);
 
         vector<MQMessageExt> msgs;
-        m_pullRequest->getMessage(msgs);
+        pullRequest->getMessage(msgs);
         if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
-          is kept, then consumer will enter following situation:
-          1>. get pull offset with 0 when do rebalance, and set
-          m_offsetTable[mq] to 0;
-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
-          offset increase by 800
-          3>. request->getMessage(msgs) always NULL
-          4>. we need update consumerOffset to nextBeginOffset indicated by
-          broker
-          but if really no new msg could be pulled, also go to this CASE
-
-          LOG_INFO("maybe misMatch between broker and client happens, update
-          consumerOffset to nextBeginOffset indicated by broker");*/
-          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, 
result.nextBeginOffset);
+          m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, 
result.nextBeginOffset);
         }
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-
-        /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
-                 (m_pullRequest->m_messageQueue).toString().c_str(),
-                 result.nextBeginOffset);*/
+        if (bProducePullRequest) {
+          m_callbackOwner->producePullMsgTask(pullRequest);
+        } else {
+          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+                   (pullRequest->m_messageQueue).toString().c_str());
+        }
+        LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", 
pullRequest->m_messageQueue.toString().c_str(),
+                  result.nextBeginOffset);
         break;
       }
       case NO_MATCHED_MSG: {
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
+        if (pullRequest->isDropped()) {
+          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
(pullRequest->m_messageQueue).toString().c_str());
+          break;
+        }
+        pullRequest->setNextOffset(result.nextBeginOffset);
 
         vector<MQMessageExt> msgs;
-        m_pullRequest->getMessage(msgs);
+        pullRequest->getMessage(msgs);
         if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
-          is kept, then consumer will enter following situation:
-          1>. get pull offset with 0 when do rebalance, and set
-          m_offsetTable[mq] to 0;
-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
-          offset increase by 800
-          3>. request->getMessage(msgs) always NULL
-          4>. we need update consumerOffset to nextBeginOffset indicated by
-          broker
-          but if really no new msg could be pulled, also go to this CASE
-
-          LOG_INFO("maybe misMatch between broker and client happens, update
-          consumerOffset to nextBeginOffset indicated by broker");*/
-          m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue, 
result.nextBeginOffset);
+          m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, 
result.nextBeginOffset);
+        }
+        if (bProducePullRequest) {
+          m_callbackOwner->producePullMsgTask(pullRequest);
+        } else {
+          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+                   (pullRequest->m_messageQueue).toString().c_str());
         }
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-        /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
-                 (m_pullRequest->m_messageQueue).toString().c_str(),
-                 result.nextBeginOffset);*/
+        LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", 
pullRequest->m_messageQueue.toString().c_str(),
+                  result.nextBeginOffset);
         break;
       }
       case OFFSET_ILLEGAL: {
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
-
-        /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
-                 (m_pullRequest->m_messageQueue).toString().c_str(),
-                 result.nextBeginOffset);*/
+        if (pullRequest->isDropped()) {
+          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
(pullRequest->m_messageQueue).toString().c_str());
+          break;
+        }
+        pullRequest->setNextOffset(result.nextBeginOffset);
+        if (bProducePullRequest) {
+          m_callbackOwner->producePullMsgTask(pullRequest);
+        } else {
+          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+                   (pullRequest->m_messageQueue).toString().c_str());
+        }
+
+        LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", 
pullRequest->m_messageQueue.toString().c_str(),
+                  result.nextBeginOffset);
         break;
       }
-      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
-        // will not returns this status, so this case
-        // could not be entered.
+      case BROKER_TIMEOUT: {
+        if (pullRequest->isDropped()) {
+          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
(pullRequest->m_messageQueue).toString().c_str());
+          break;
+        }
         LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
-        m_pullRequest->setNextOffset(result.nextBeginOffset);
-        if (bProducePullRequest)
-          m_callbackOwner->producePullMsgTask(m_pullRequest);
-        else
-          m_pullRequest->removePullMsgEvent();
+        pullRequest->setNextOffset(result.nextBeginOffset);
+        if (bProducePullRequest) {
+          m_callbackOwner->producePullMsgTask(pullRequest);
+        } else {
+          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+                   (pullRequest->m_messageQueue).toString().c_str());
+        }
         break;
       }
     }
   }
 
   virtual void onException(MQException& e) {
-    if (m_bShutdown == true) {
-      LOG_INFO("pullrequest for:%s in shutdown, return", 
(m_pullRequest->m_messageQueue).toString().c_str());
-      m_pullRequest->removePullMsgEvent();
+    boost::shared_ptr<PullRequest> pullRequest = m_pullRequest.lock();
+    if (!pullRequest) {
+      LOG_WARN("Pull request has been released.");
       return;
     }
-    LOG_WARN("pullrequest for:%s occurs exception, reproduce it", 
(m_pullRequest->m_messageQueue).toString().c_str());
-    m_callbackOwner->producePullMsgTask(m_pullRequest);
+    std::string queueName = pullRequest->m_messageQueue.toString();
+    if (m_bShutdown) {
+      LOG_INFO("pullrequest for:%s in shutdown, return", queueName.c_str());
+      return;
+    }
+    if (pullRequest->isDropped()) {
+      LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", queueName.c_str());
+      return;
+    }
+    LOG_WARN("Pullrequest for:%s occurs exception, reproduce it after 1s.", 
queueName.c_str());
+    m_callbackOwner->producePullMsgTaskLater(pullRequest, 1000);
   }
 
   void setShutdownStatus() { m_bShutdown = true; }
 
+  const boost::weak_ptr<PullRequest>& getPullRequest() const { return 
m_pullRequest; }
+
+  void setPullRequest(boost::weak_ptr<PullRequest>& pullRequest) { 
m_pullRequest = pullRequest; }
+
  private:
   DefaultMQPushConsumer* m_callbackOwner;
-  PullRequest* m_pullRequest;
+  boost::weak_ptr<PullRequest> m_pullRequest;
   bool m_bShutdown;
 };
 
@@ -524,38 +529,84 @@ void DefaultMQPushConsumer::removeConsumeOffset(const 
MQMessageQueue& mq) {
   m_pOffsetStore->removeOffset(mq);
 }
 
-void 
DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t, 
PullRequest* request) {
-  // LOG_INFO("trigger pullrequest for:%s",
-  // (request->m_messageQueue).toString().c_str());
-  producePullMsgTask(request);
+void DefaultMQPushConsumer::static_triggerNextPullRequest(void* context,
+                                                          
boost::asio::deadline_timer* t,
+                                                          
boost::weak_ptr<PullRequest> pullRequest) {
+  if (pullRequest.expired()) {
+    LOG_WARN("Pull request has been released before.");
+    return;
+  }
+  DefaultMQPushConsumer* pDefaultMQPushConsumer = 
(DefaultMQPushConsumer*)context;
+  if (pDefaultMQPushConsumer) {
+    pDefaultMQPushConsumer->triggerNextPullRequest(t, pullRequest);
+  }
+}
+
+void 
DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t,
+                                                   
boost::weak_ptr<PullRequest> pullRequest) {
+  // delete first to avoild memleak
   deleteAndZero(t);
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released before.");
+    return;
+  }
+  producePullMsgTask(request);
 }
 
-void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {
+bool 
DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest> 
pullRequest, int millis) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_INFO("Pull request is invalid. Maybe it is dropped before.");
+    return false;
+  }
+  if (request->isDropped()) {
+    LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
request->m_messageQueue.toString().c_str());
+    return false;
+  }
+  boost::asio::deadline_timer* t =
+      new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(millis));
+  
t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
 this, t, request));
+  LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", 
(request->m_messageQueue).toString().c_str(), millis);
+  return true;
+}
+
+bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> 
pullRequest) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_WARN("Pull request has been released.");
+    return false;
+  }
+  if (request->isDropped()) {
+    LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", 
request->m_messageQueue.toString().c_str());
+    return false;
+  }
   if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
-    request->addPullMsgEvent();
     if (m_asyncPull) {
       
m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync,
 this, request));
     } else {
       
m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage, 
this, request));
     }
   } else {
-    LOG_WARN("produce pullmsg of mq:%s failed", 
request->m_messageQueue.toString().c_str());
+    LOG_WARN("produce PullRequest of mq:%s failed", 
request->m_messageQueue.toString().c_str());
+    return false;
   }
+  return true;
 }
 
 void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {
   pTaskQueue->run();
 }
 
-void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
-  if (request == NULL) {
-    LOG_ERROR("Pull request is NULL, return");
+void DefaultMQPushConsumer::pullMessage(boost::weak_ptr<PullRequest> 
pullRequest) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_ERROR("Pull request is released, return");
     return;
   }
-  if (request->isDroped()) {
+  if (request->isDropped()) {
     LOG_WARN("Pull request is set drop with mq:%s, return", 
(request->m_messageQueue).toString().c_str());
-    request->removePullMsgEvent();
+    // request->removePullMsgEvent();
     return;
   }
 
@@ -563,19 +614,19 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
   if (m_consumerService->getConsumeMsgSerivceListenerType() == 
messageListenerOrderly) {
     if (!request->isLocked() || request->isLockExpired()) {
       if (!m_pRebalance->lock(messageQueue)) {
-        producePullMsgTask(request);
+        request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+        producePullMsgTaskLater(request, 1000);
         return;
       }
     }
   }
 
   if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
-    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
-    // than:%d",  (request->m_messageQueue).toString().c_str(),
-    // request->getCacheMsgCount(), m_maxMsgCacheSize);
-    boost::asio::deadline_timer* t =
-        new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(1 * 1000));
-    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, 
this, t, request));
+    LOG_INFO("Sync Pull request for %s has Cached with %d Messages and The Max 
size is %d, Sleep 1s.",
+             (request->m_messageQueue).toString().c_str(), 
request->getCacheMsgCount(), m_maxMsgCacheSize);
+    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+    // Retry 1s,
+    producePullMsgTaskLater(request, 1000);
     return;
   }
 
@@ -591,7 +642,9 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
   string subExpression;
   SubscriptionData* pSdata = 
m_pRebalance->getSubscriptionData(messageQueue.getTopic());
   if (pSdata == NULL) {
-    producePullMsgTask(request);
+    LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 
1s.",
+             (request->m_messageQueue).toString().c_str());
+    producePullMsgTaskLater(request, 1000);
     return;
   }
   subExpression = pSdata->getSubString();
@@ -600,7 +653,10 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
                                           false,                   // suspend
                                           !subExpression.empty(),  // 
subscription
                                           false);                  // class 
filter
-
+  if (request->isDropped()) {
+    LOG_WARN("Pull request is set as dropped with mq:%s, return", 
(request->m_messageQueue).toString().c_str());
+    return;
+  }
   try {
     request->setLastPullTimestamp(UtilAll::currentTimeMillis());
     unique_ptr<PullResult> 
result(m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
@@ -616,45 +672,35 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
                                                                     NULL, 
getSessionCredentials()));
 
     PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, 
result.get(), pSdata);
-
     switch (pullResult.pullStatus) {
       case FOUND: {
-        if (!request->isDroped())  // if request is setted to dropped, don't 
add
-                                   // msgFoundList to m_msgTreeMap and don't
-                                   // call producePullMsgTask
-        {                          // avoid issue: pullMsg is sent out, 
rebalance is doing concurrently
-          // and this request is dropped, and then received pulled msgs.
-          request->setNextOffset(pullResult.nextBeginOffset);
-          request->putMessage(pullResult.msgFoundList);
-
-          m_consumerService->submitConsumeRequest(request, 
pullResult.msgFoundList);
-          producePullMsgTask(request);
-
-          LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", 
messageQueue.toString().c_str(),
-                    pullResult.msgFoundList.size(), 
pullResult.nextBeginOffset);
-        } else {
-          request->removePullMsgEvent();
+        if (request->isDropped()) {
+          LOG_INFO("Get pull result but the queue has been marked as dropped. 
Queue: %s",
+                   messageQueue.toString().c_str());
+          break;
         }
+        // and this request is dropped, and then received pulled msgs.
+        request->setNextOffset(pullResult.nextBeginOffset);
+        request->putMessage(pullResult.msgFoundList);
+
+        m_consumerService->submitConsumeRequest(request, 
pullResult.msgFoundList);
+        producePullMsgTask(request);
+
+        LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", 
messageQueue.toString().c_str(),
+                  pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+
         break;
       }
       case NO_NEW_MSG: {
+        if (request->isDropped()) {
+          LOG_INFO("Get pull result but the queue has been marked as dropped. 
Queue: %s",
+                   messageQueue.toString().c_str());
+          break;
+        }
         request->setNextOffset(pullResult.nextBeginOffset);
         vector<MQMessageExt> msgs;
         request->getMessage(msgs);
         if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
-          /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
-          is kept, then consumer will enter following situation:
-          1>. get pull offset with 0 when do rebalance, and set
-          m_offsetTable[mq] to 0;
-          2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
-          offset increase by 800
-          3>. request->getMessage(msgs) always NULL
-          4>. we need update consumerOffset to nextBeginOffset indicated by
-          broker
-          but if really no new msg could be pulled, also go to this CASE
-       */
-          // LOG_DEBUG("maybe misMatch between broker and client happens, 
update
-          // consumerOffset to nextBeginOffset indicated by broker");
           updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
         }
         producePullMsgTask(request);
@@ -662,12 +708,15 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
         break;
       }
       case NO_MATCHED_MSG: {
+        if (request->isDropped()) {
+          LOG_INFO("Get pull result but the queue has been marked as dropped. 
Queue: %s",
+                   messageQueue.toString().c_str());
+          break;
+        }
         request->setNextOffset(pullResult.nextBeginOffset);
         vector<MQMessageExt> msgs;
         request->getMessage(msgs);
         if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
-          // LOG_DEBUG("maybe misMatch between broker and client happens, 
update
-          // consumerOffset to nextBeginOffset indicated by broker");
           updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
         }
         producePullMsgTask(request);
@@ -677,6 +726,11 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
         break;
       }
       case OFFSET_ILLEGAL: {
+        if (request->isDropped()) {
+          LOG_INFO("Get pull result but the queue has been marked as dropped. 
Queue: %s",
+                   messageQueue.toString().c_str());
+          break;
+        }
         request->setNextOffset(pullResult.nextBeginOffset);
         producePullMsgTask(request);
 
@@ -695,11 +749,17 @@ void DefaultMQPushConsumer::pullMessage(PullRequest* 
request) {
     }
   } catch (MQException& e) {
     LOG_ERROR(e.what());
-    producePullMsgTask(request);
+    LOG_WARN("Pull %s occur exception, restart 1s  later.", 
messageQueue.toString().c_str());
+    producePullMsgTaskLater(request, 1000);
   }
 }
 
-AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest* 
request, MQMessageQueue msgQueue) {
+AsyncPullCallback* 
DefaultMQPushConsumer::getAsyncPullCallBack(boost::weak_ptr<PullRequest> 
pullRequest,
+                                                               MQMessageQueue 
msgQueue) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    return NULL;
+  }
   boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
   if (m_asyncPull && request) {
     PullMAP::iterator it = m_PullCallback.find(msgQueue);
@@ -707,7 +767,11 @@ AsyncPullCallback* 
DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest* requ
       LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
       m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
     }
-    return m_PullCallback[msgQueue];
+    AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue];
+    if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) {
+      asyncPullCallback->setPullRequest(pullRequest);
+    }
+    return asyncPullCallback;
   }
 
   return NULL;
@@ -727,34 +791,34 @@ void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
   }
 }
 
-void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
-  if (request == NULL) {
-    LOG_ERROR("Pull request is NULL, return");
+void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr<PullRequest> 
pullRequest) {
+  boost::shared_ptr<PullRequest> request = pullRequest.lock();
+  if (!request) {
+    LOG_ERROR("Pull request is released, return");
     return;
   }
-  if (request->isDroped()) {
+  if (request->isDropped()) {
     LOG_WARN("Pull request is set drop with mq:%s, return", 
(request->m_messageQueue).toString().c_str());
-    request->removePullMsgEvent();
     return;
   }
-
   MQMessageQueue& messageQueue = request->m_messageQueue;
   if (m_consumerService->getConsumeMsgSerivceListenerType() == 
messageListenerOrderly) {
     if (!request->isLocked() || request->isLockExpired()) {
       if (!m_pRebalance->lock(messageQueue)) {
-        producePullMsgTask(request);
+        request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+        // Retry later.
+        producePullMsgTaskLater(request, 1000);
         return;
       }
     }
   }
 
   if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
-    // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
-    // than:%d",  (request->m_messageQueue).toString().c_str(),
-    // request->getCacheMsgCount(), m_maxMsgCacheSize);
-    boost::asio::deadline_timer* t =
-        new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(1 * 1000));
-    t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest, 
this, t, request));
+    LOG_INFO("Pull request for [%s] has Cached with %d Messages and The Max 
size is %d, Sleep 3s.",
+             (request->m_messageQueue).toString().c_str(), 
request->getCacheMsgCount(), m_maxMsgCacheSize);
+    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+    // Retry 3s,
+    producePullMsgTaskLater(request, 3000);
     return;
   }
 
@@ -770,7 +834,10 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* 
request) {
   string subExpression;
   SubscriptionData* pSdata = 
(m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
   if (pSdata == NULL) {
-    producePullMsgTask(request);
+    LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 
1s.",
+             (request->m_messageQueue).toString().c_str());
+    // Subscribe data error, retry later.
+    producePullMsgTaskLater(request, 1000);
     return;
   }
   subExpression = pSdata->getSubString();
@@ -784,24 +851,39 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest* 
request) {
   arg.mq = messageQueue;
   arg.subData = *pSdata;
   arg.pPullWrapper = m_pPullAPIWrapper;
+  if (request->isDropped()) {
+    LOG_WARN("Pull request is set as dropped with mq:%s, return", 
request->m_messageQueue.toString().c_str());
+    return;
+  }
   try {
     request->setLastPullTimestamp(UtilAll::currentTimeMillis());
-    m_pPullAPIWrapper->pullKernelImpl(messageQueue,                            
     // 1
-                                      subExpression,                           
     // 2
-                                      pSdata->getSubVersion(),                 
     // 3
-                                      request->getNextOffset(),                
     // 4
-                                      32,                                      
     // 5
-                                      sysFlag,                                 
     // 6
-                                      commitOffsetValue,                       
     // 7
-                                      1000 * 15,                               
     // 8
-                                      m_asyncPullTimeout,                      
     // 9
-                                      ComMode_ASYNC,                           
     // 10
-                                      getAsyncPullCallBack(request, 
messageQueue),  // 11
-                                      getSessionCredentials(),                 
     // 12
-                                      &arg);                                   
     // 13
+    AsyncPullCallback* pullCallback = getAsyncPullCallBack(request, 
messageQueue);
+    if (pullCallback == NULL) {
+      LOG_WARN("Can not get pull callback for:%s, Maybe this pull request has 
been released.",
+               request->m_messageQueue.toString().c_str());
+      return;
+    }
+    m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
+                                      subExpression,             // 2
+                                      pSdata->getSubVersion(),   // 3
+                                      request->getNextOffset(),  // 4
+                                      32,                        // 5
+                                      sysFlag,                   // 6
+                                      commitOffsetValue,         // 7
+                                      1000 * 15,                 // 8
+                                      m_asyncPullTimeout,        // 9
+                                      ComMode_ASYNC,             // 10
+                                      pullCallback,              // 11
+                                      getSessionCredentials(),   // 12
+                                      &arg);                     // 13
   } catch (MQException& e) {
     LOG_ERROR(e.what());
-    producePullMsgTask(request);
+    if (request->isDropped()) {
+      LOG_WARN("Pull request is set as dropped with mq:%s, return", 
(request->m_messageQueue).toString().c_str());
+      return;
+    }
+    LOG_INFO("Pull %s occur exception, restart 1s  later.", 
(request->m_messageQueue).toString().c_str());
+    producePullMsgTaskLater(request, 1000);
   }
 }
 
@@ -868,10 +950,10 @@ ConsumerRunningInfo* 
DefaultMQPushConsumer::getConsumerRunningInfo() {
   getSubscriptions(result);
   info->setSubscriptionSet(result);
 
-  std::map<MQMessageQueue, PullRequest*> requestTable = 
m_pRebalance->getPullRequestTable();
+  std::map<MQMessageQueue, boost::shared_ptr<PullRequest>> requestTable = 
m_pRebalance->getPullRequestTable();
 
   for (const auto& it : requestTable) {
-    if (!it.second->isDroped()) {
+    if (!it.second->isDropped()) {
       MessageQueue queue((it.first).getTopic(), (it.first).getBrokerName(), 
(it.first).getQueueId());
       ProcessQueueInfo processQueue;
       processQueue.cachedMsgMinOffset = it.second->getCacheMinOffset();
@@ -879,7 +961,7 @@ ConsumerRunningInfo* 
DefaultMQPushConsumer::getConsumerRunningInfo() {
       processQueue.cachedMsgCount = it.second->getCacheMsgCount();
       processQueue.setCommitOffset(
           m_pOffsetStore->readOffset(it.first, MEMORY_FIRST_THEN_STORE, 
getSessionCredentials()));
-      processQueue.setDroped(it.second->isDroped());
+      processQueue.setDroped(it.second->isDropped());
       processQueue.setLocked(it.second->isLocked());
       processQueue.lastLockTimestamp = it.second->getLastLockTimestamp();
       processQueue.lastPullTimestamp = it.second->getLastPullTimestamp();
@@ -892,4 +974,4 @@ ConsumerRunningInfo* 
DefaultMQPushConsumer::getConsumerRunningInfo() {
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/FindBrokerResult.h b/src/consumer/FindBrokerResult.h
index 789dc91..76ffc9b 100644
--- a/src/consumer/FindBrokerResult.h
+++ b/src/consumer/FindBrokerResult.h
@@ -27,6 +27,6 @@ struct FindBrokerResult {
   std::string brokerAddr;
   bool slave;
 };
-}
+}  // namespace rocketmq
 
 #endif
diff --git a/src/consumer/OffsetStore.cpp b/src/consumer/OffsetStore.cpp
index fc2a5a0..324653d 100644
--- a/src/consumer/OffsetStore.cpp
+++ b/src/consumer/OffsetStore.cpp
@@ -276,7 +276,7 @@ void RemoteBrokerOffsetStore::persist(const MQMessageQueue& 
mq, const SessionCre
     try {
       updateConsumeOffsetToBroker(mq, it->second, session_credentials);
     } catch (MQException& e) {
-      LOG_ERROR("updateConsumeOffsetToBroker error");
+      LOG_ERROR("updateConsumeOffsetToBroker %s ,offset:[%lld] error", 
mq.toString().c_str(), it->second);
     }
   }
 }
@@ -340,4 +340,4 @@ int64 
RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MQMessageQueue
   }
 }
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/OffsetStore.h b/src/consumer/OffsetStore.h
index d10f233..d7ec92b 100644
--- a/src/consumer/OffsetStore.h
+++ b/src/consumer/OffsetStore.h
@@ -101,6 +101,6 @@ class RemoteBrokerOffsetStore : public OffsetStore {
   int64 fetchConsumeOffsetFromBroker(const MQMessageQueue& mq, const 
SessionCredentials& session_credentials);
 };
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index 46832dd..a7f1156 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -130,4 +130,4 @@ PullResult* PullAPIWrapper::pullKernelImpl(const 
MQMessageQueue& mq,        // 1
   THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
 }
 
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 57c6f0d..29a64fb 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -61,6 +61,6 @@ class PullAPIWrapper {
 };
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif  //<! _PULLAPIWRAPPER_H_
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 162be2a..e578840 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -21,14 +21,18 @@ namespace rocketmq {
 //<!***************************************************************************
 const uint64 PullRequest::RebalanceLockInterval = 20 * 1000;
 const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000;
+/**
+ * If the process queue has not been pulled for more than MAX_PULL_IDLE_TIME, 
we need to mark it as dropped
+ * default 120s
+ */
+const uint64 PullRequest::MAX_PULL_IDLE_TIME = 120 * 1000;
 
 PullRequest::PullRequest(const string& groupname)
-    : m_groupname(groupname),
-      m_nextOffset(0),
-      m_queueOffsetMax(0),
-      m_bDroped(false),
-      m_bLocked(false),
-      m_bPullMsgEventInprogress(false) {}
+    : m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0), 
m_bDropped(false), m_bLocked(false) {
+  m_lastLockTimestamp = UtilAll::currentTimeMillis();
+  m_lastPullTimestamp = UtilAll::currentTimeMillis();
+  m_lastConsumeTimestamp = UtilAll::currentTimeMillis();
+}
 
 PullRequest::~PullRequest() {
   m_msgTreeMapTemp.clear();
@@ -40,11 +44,13 @@ PullRequest& PullRequest::operator=(const PullRequest& 
other) {
   if (this != &other) {
     m_groupname = other.m_groupname;
     m_nextOffset = other.m_nextOffset;
-    m_bDroped.store(other.m_bDroped.load());
+    m_bDropped.store(other.m_bDropped.load());
     m_queueOffsetMax = other.m_queueOffsetMax;
     m_messageQueue = other.m_messageQueue;
     m_msgTreeMap = other.m_msgTreeMap;
     m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+    m_lastPullTimestamp = other.m_lastPullTimestamp;
+    m_lastConsumeTimestamp = other.m_lastConsumeTimestamp;
   }
   return *this;
 }
@@ -127,7 +133,7 @@ int64 PullRequest::removeMessage(vector<MQMessageExt>& 
msgs) {
 void PullRequest::clearAllMsgs() {
   boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
 
-  if (isDroped()) {
+  if (isDropped()) {
     LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped.");
     m_msgTreeMap.clear();
     m_msgTreeMapTemp.clear();
@@ -143,9 +149,9 @@ void PullRequest::updateQueueMaxOffset(int64 queueOffset) {
   m_queueOffsetMax = queueOffset;
 }
 
-void PullRequest::setDroped(bool droped) {
-  int temp = (droped == true ? 1 : 0);
-  m_bDroped.store(temp);
+void PullRequest::setDropped(bool dropped) {
+  int temp = (dropped == true ? 1 : 0);
+  m_bDropped.store(temp);
   /*
   m_queueOffsetMax = 0;
   m_nextOffset = 0;
@@ -160,8 +166,8 @@ void PullRequest::setDroped(bool droped) {
   */
 }
 
-bool PullRequest::isDroped() const {
-  return m_bDroped.load() == 1;
+bool PullRequest::isDropped() const {
+  return m_bDropped.load() == 1;
 }
 
 int64 PullRequest::getNextOffset() {
@@ -173,6 +179,7 @@ void PullRequest::setLocked(bool Locked) {
   int temp = (Locked == true ? 1 : 0);
   m_bLocked.store(temp);
 }
+
 bool PullRequest::isLocked() const {
   return m_bLocked.load() == 1;
 }
@@ -197,6 +204,16 @@ uint64 PullRequest::getLastPullTimestamp() const {
   return m_lastPullTimestamp;
 }
 
+bool PullRequest::isPullRequestExpired() const {
+  uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
+  if (interval <= UtilAll::currentTimeMillis()) {
+    LOG_WARN("PullRequest for [%s] has been expired %lld 
ms,m_lastPullTimestamp = %lld ms",
+             m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - 
interval, m_lastPullTimestamp);
+    return true;
+  }
+  return false;
+}
+
 void PullRequest::setLastConsumeTimestamp(uint64 time) {
   m_lastConsumeTimestamp = time;
 }
@@ -257,18 +274,5 @@ int64 PullRequest::commit() {
   }
 }
 
-void PullRequest::removePullMsgEvent() {
-  m_bPullMsgEventInprogress = false;
-}
-
-bool PullRequest::addPullMsgEvent() {
-  if (m_bPullMsgEventInprogress == false) {
-    m_bPullMsgEventInprogress = true;
-    LOG_INFO("pullRequest with mq :%s set pullMsgEvent", 
m_messageQueue.toString().c_str());
-    return true;
-  }
-  return false;
-}
-
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index 83b39b8..c414576 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -41,8 +41,8 @@ class PullRequest {
 
   PullRequest& operator=(const PullRequest& other);
 
-  void setDroped(bool droped);
-  bool isDroped() const;
+  void setDropped(bool dropped);
+  bool isDropped() const;
 
   int64 getNextOffset();
   void setNextOffset(int64 nextoffset);
@@ -58,6 +58,7 @@ class PullRequest {
   int64 getLastLockTimestamp() const;
   void setLastPullTimestamp(uint64 time);
   uint64 getLastPullTimestamp() const;
+  bool isPullRequestExpired() const;
   void setLastConsumeTimestamp(uint64 time);
   uint64 getLastConsumeTimestamp() const;
   void setTryUnlockTimes(int time);
@@ -66,19 +67,24 @@ class PullRequest {
   int64 commit();
   void makeMessageToCosumeAgain(vector<MQMessageExt>& msgs);
   boost::timed_mutex& getPullRequestCriticalSection();
-  void removePullMsgEvent();
+  bool removePullMsgEvent(bool force = false);
   bool addPullMsgEvent();
+  /**
+   * Check if there is an in-flight pull request.
+   */
+  bool hasInFlightPullRequest() const;
 
  public:
   MQMessageQueue m_messageQueue;
   static const uint64 RebalanceLockInterval;     // ms
   static const uint64 RebalanceLockMaxLiveTime;  // ms
+  static const uint64 MAX_PULL_IDLE_TIME;        // ms
 
  private:
   string m_groupname;
   int64 m_nextOffset;
   int64 m_queueOffsetMax;
-  boost::atomic<bool> m_bDroped;
+  boost::atomic<bool> m_bDropped;
   boost::atomic<bool> m_bLocked;
   map<int64, MQMessageExt> m_msgTreeMap;
   map<int64, MQMessageExt> m_msgTreeMapTemp;
@@ -88,9 +94,8 @@ class PullRequest {
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
   boost::timed_mutex m_consumeLock;
-  boost::atomic<bool> m_bPullMsgEventInprogress;
 };
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/consumer/PullResult.cpp b/src/consumer/PullResult.cpp
index 5fc2f67..64e60cd 100644
--- a/src/consumer/PullResult.cpp
+++ b/src/consumer/PullResult.cpp
@@ -43,4 +43,4 @@ PullResult::~PullResult() {
 }
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/PullResultExt.h b/src/consumer/PullResultExt.h
index c5141e2..644d4cd 100644
--- a/src/consumer/PullResultExt.h
+++ b/src/consumer/PullResultExt.h
@@ -59,4 +59,4 @@ class PullResultExt : public PullResult {
   MemoryBlock msgMemBlock;
 };
 
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 2f73bd1..0a4c06f 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -32,17 +32,19 @@ Rebalance::Rebalance(MQConsumer* consumer, MQClientFactory* 
pfactory)
 Rebalance::~Rebalance() {
   {
     map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
-    for (; it != m_subscriptionData.end(); ++it)
+    for (; it != m_subscriptionData.end(); ++it) {
       deleteAndZero(it->second);
+    }
     m_subscriptionData.clear();
   }
   {
-    MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
-    for (; it != m_requestQueueTable.end(); ++it) {
-      delete it->second;
-      it->second = NULL;
-    }
-    m_requestQueueTable.clear();
+    /*
+  MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+  for (; it != m_requestQueueTable.end(); ++it) {
+    delete it->second;
+    it->second = NULL;
+  }
+  m_requestQueueTable.clear();*/
   }
   m_topicSubscribeInfoTable.clear();
   m_pConsumer = NULL;
@@ -56,15 +58,21 @@ void Rebalance::doRebalance() {
     map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
     for (; it != m_subscriptionData.end(); ++it) {
       string topic = (it->first);
-      LOG_INFO("current topic is:%s", topic.c_str());
+      LOG_DEBUG("current topic is:%s", topic.c_str());
       //<!topic -> mqs
       vector<MQMessageQueue> mqAll;
       if (!getTopicSubscribeInfo(topic, mqAll)) {
         continue;
       }
       if (mqAll.empty()) {
-        if (!UtilAll::startsWith_retry(topic))
-          THROW_MQEXCEPTION(MQClientException, "doRebalance the topic is 
empty", -1);
+        if (!UtilAll::startsWith_retry(topic)) {
+          std::string msg("#doRebalance. mqAll for topic:");
+          msg.append(topic);
+          msg.append(" is empty");
+          LOG_ERROR("Queues to allocate are empty. Msg: %s", msg.c_str());
+          // to check, return error or throw exception
+          THROW_MQEXCEPTION(MQClientException, msg, -1);
+        }
       }
 
       //<!msg model;
@@ -82,30 +90,14 @@ void Rebalance::doRebalance() {
                                             
m_pConsumer->getSessionCredentials());
 
           if (cidAll.empty()) {
-            /*remove the droping pullRequest changes for recovery consume 
fastly
-                from network broken
-                //drop all pullRequest
-                MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
-                for (; it != m_requestQueueTable.end(); ++it)
-                {
-                    if(!(it->second->isDroped()))
-                    {
-                        MQMessageQueue mqtemp = it->first;
-                        it->second->setDroped(true);
-                        removeUnnecessaryMessageQueue(mqtemp);
-                        it->second->clearAllMsgs();//add clear operation to
-            avoid bad
-                state when dropped pullRequest returns normal
-                        LOG_INFO("find consumer failed, drop undropped mq:%s",
-                mqtemp.toString().c_str());
-                    }
-            }*/
-
+            LOG_ERROR("[ERROR] Get empty consumer IDs. Consumer Group: %s, 
Topic: %s",
+                      m_pConsumer->getGroupName().c_str(), topic.c_str());
+            // Should skip this round of re-balance immediately if consumer ID 
set is empty.
             THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is 
empty", -1);
           }
           // log
           for (int i = 0; i < (int)cidAll.size(); ++i) {
-            LOG_INFO("client id:%s of topic:%s", cidAll[i].c_str(), 
topic.c_str());
+            LOG_DEBUG("client id:%s of topic:%s", cidAll[i].c_str(), 
topic.c_str());
           }
           //<! sort;
           sort(mqAll.begin(), mqAll.end());
@@ -116,17 +108,34 @@ void Rebalance::doRebalance() {
           try {
             m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), 
mqAll, cidAll, allocateResult);
           } catch (MQException& e) {
-            THROW_MQEXCEPTION(MQClientException, "allocate error", -1);
+            std::string errMsg("Allocate message queue for ConsumerGroup[");
+            errMsg.append(m_pConsumer->getGroupName());
+            errMsg.append("],Topic[");
+            errMsg.append(topic);
+            errMsg.append("] failed. ");
+            LOG_ERROR(errMsg.c_str());
+            THROW_MQEXCEPTION(MQClientException, errMsg, -1);
           }
 
           // log
           for (int i = 0; i < (int)allocateResult.size(); ++i) {
-            LOG_INFO("allocate mq:%s", allocateResult[i].toString().c_str());
+            LOG_DEBUG("allocate mq:%s", allocateResult[i].toString().c_str());
           }
 
           //<!update local;
           bool changed = updateRequestTableInRebalance(topic, allocateResult);
           if (changed) {
+            std::stringstream ss;
+            ss << "Allocation result for [Consumer Group: " << 
m_pConsumer->getGroupName() << ", Topic: " << topic
+               << ", Current Consumer ID: " << m_pConsumer->getMQClientId() << 
"] is changed.\n "
+               << "Total Queue #: " << mqAll.size() << ", Total Consumer #: " 
<< cidAll.size()
+               << " Allocated Queues are: \n";
+
+            for (vector<MQMessageQueue>::size_type i = 0; i < 
allocateResult.size(); ++i) {
+              ss << allocateResult[i].toString() << "\n";
+            }
+            // Log allocation result.
+            LOG_INFO(ss.str().c_str());
             messageQueueChanged(topic, mqAll, allocateResult);
             break;
           }
@@ -148,7 +157,7 @@ void Rebalance::persistConsumerOffset() {
     boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
     MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
     for (; it != m_requestQueueTable.end(); ++it) {
-      if (it->second && (!it->second->isDroped())) {
+      if (it->second && (!it->second->isDropped())) {
         mqs.push_back(it->first);
       }
     }
@@ -173,7 +182,7 @@ void Rebalance::persistConsumerOffsetByResetOffset() {
     MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
     for (; it != m_requestQueueTable.end(); ++it) {
       if (it->second) {  // even if it was dropped, also need update offset 
when
-                         // rcv resetOffset cmd
+        // rcv resetOffset cmd
         mqs.push_back(it->first);
       }
     }
@@ -225,29 +234,42 @@ bool Rebalance::getTopicSubscribeInfo(const string& 
topic, vector<MQMessageQueue
   return false;
 }
 
-void Rebalance::addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest) {
+void Rebalance::addPullRequest(MQMessageQueue mq, 
boost::shared_ptr<PullRequest> pPullRequest) {
   boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
   m_requestQueueTable[mq] = pPullRequest;
 }
 
-PullRequest* Rebalance::getPullRequest(MQMessageQueue mq) {
+void Rebalance::removePullRequest(MQMessageQueue mq) {
+  boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+  if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
+    m_requestQueueTable.erase(mq);
+  }
+}
+bool Rebalance::isPullRequestExist(MQMessageQueue mq) {
+  boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+  if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
+    return true;
+  }
+  return false;
+}
+boost::weak_ptr<PullRequest> Rebalance::getPullRequest(MQMessageQueue mq) {
   boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
   if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
     return m_requestQueueTable[mq];
   }
-  return NULL;
+  return boost::weak_ptr<PullRequest>();
 }
 
-map<MQMessageQueue, PullRequest*> Rebalance::getPullRequestTable() {
+map<MQMessageQueue, boost::shared_ptr<PullRequest>> 
Rebalance::getPullRequestTable() {
   boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
   return m_requestQueueTable;
 }
 
-void Rebalance::unlockAll(bool oneway) {
+void Rebalance::unlockAll(bool oneWay) {
   map<string, vector<MQMessageQueue>*> brokerMqs;
   MQ2PULLREQ requestQueueTable = getPullRequestTable();
   for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != 
requestQueueTable.end(); ++it) {
-    if (!(it->second->isDroped())) {
+    if (!(it->second->isDropped())) {
       if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
         vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
         brokerMqs[it->first.getBrokerName()] = mqs;
@@ -274,10 +296,10 @@ void Rebalance::unlockAll(bool oneway) {
       
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr,
 unlockBatchRequest.get(),
                                                             1000, 
m_pConsumer->getSessionCredentials());
       for (unsigned int i = 0; i != mqs.size(); ++i) {
-        PullRequest* pullreq = getPullRequest(mqs[i]);
-        if (pullreq) {
+        boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
+        if (!pullreq.expired()) {
           LOG_INFO("unlockBatchMQ success of mq:%s", 
mqs[i].toString().c_str());
-          pullreq->setLocked(false);
+          pullreq.lock()->setLocked(false);
         } else {
           LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
         }
@@ -308,10 +330,10 @@ void Rebalance::unlock(MQMessageQueue mq) {
     
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr,
 unlockBatchRequest.get(), 1000,
                                                           
m_pConsumer->getSessionCredentials());
     for (unsigned int i = 0; i != mqs.size(); ++i) {
-      PullRequest* pullreq = getPullRequest(mqs[i]);
-      if (pullreq) {
+      boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
+      if (!pullreq.expired()) {
         LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str());
-        pullreq->setLocked(false);
+        pullreq.lock()->setLocked(false);
       } else {
         LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str());
       }
@@ -325,7 +347,7 @@ void Rebalance::lockAll() {
   map<string, vector<MQMessageQueue>*> brokerMqs;
   MQ2PULLREQ requestQueueTable = getPullRequestTable();
   for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != 
requestQueueTable.end(); ++it) {
-    if (!(it->second->isDroped())) {
+    if (!(it->second->isDropped())) {
       string brokerKey = it->first.getBrokerName() + it->first.getTopic();
       if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
         vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
@@ -355,11 +377,11 @@ void Rebalance::lockAll() {
       
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr,
 lockBatchRequest.get(),
                                                           messageQueues, 1000, 
m_pConsumer->getSessionCredentials());
       for (unsigned int i = 0; i != messageQueues.size(); ++i) {
-        PullRequest* pullreq = getPullRequest(messageQueues[i]);
-        if (pullreq) {
+        boost::weak_ptr<PullRequest> pullreq = 
getPullRequest(messageQueues[i]);
+        if (!pullreq.expired()) {
           LOG_INFO("lockBatchMQ success of mq:%s", 
messageQueues[i].toString().c_str());
-          pullreq->setLocked(true);
-          pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+          pullreq.lock()->setLocked(true);
+          pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
         } else {
           LOG_ERROR("lockBatchMQ fails of mq:%s", 
messageQueues[i].toString().c_str());
         }
@@ -372,6 +394,7 @@ void Rebalance::lockAll() {
   }
   brokerMqs.clear();
 }
+
 bool Rebalance::lock(MQMessageQueue mq) {
   unique_ptr<FindBrokerResult> pFindBrokerResult(
       m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), 
MASTER_ID, true));
@@ -397,11 +420,11 @@ bool Rebalance::lock(MQMessageQueue mq) {
       return false;
     }
     for (unsigned int i = 0; i != messageQueues.size(); ++i) {
-      PullRequest* pullreq = getPullRequest(messageQueues[i]);
-      if (pullreq) {
+      boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
+      if (!pullreq.expired()) {
         LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
-        pullreq->setLocked(true);
-        pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+        pullreq.lock()->setLocked(true);
+        pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
         lockResult = true;
       } else {
         LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
@@ -436,113 +459,105 @@ void RebalancePull::removeUnnecessaryMessageQueue(const 
MQMessageQueue& mq) {}
 RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory) 
: Rebalance(consumer, pfactory) {}
 
 bool RebalancePush::updateRequestTableInRebalance(const string& topic, 
vector<MQMessageQueue>& mqsSelf) {
-  LOG_DEBUG("updateRequestTableInRebalance Enter");
+  LOG_DEBUG("updateRequestTableInRebalance for Topic[%s] Enter", 
topic.c_str());
+
+  // 1. Clear no in charge of
+  //   1. set dropped
+  //   2. clear local message
+  //   3. clear offset
+  //   4. remove request table
+  //   5. set flag for route changed
+  // 2. Check and clear dropped/invalid pullrequest(timeout and so on)
+  // 3. Add new mq in charge of
+  //   1. new pullrequest
+  //   2. init next pull offset
+  //   3. int  offset
+  //   4. add request table
+  //   5. set flag for route changed
+  // 4. Start long pull for request
   if (mqsSelf.empty()) {
     LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
   }
 
   bool changed = false;
 
-  //<!remove
+  //<!remove none responsive mq
   MQ2PULLREQ requestQueueTable(getPullRequestTable());
-  MQ2PULLREQ::iterator it = requestQueueTable.begin();
-  for (; it != requestQueueTable.end(); ++it) {
-    MQMessageQueue mqtemp = it->first;
+  MQ2PULLREQ::iterator itDel = requestQueueTable.begin();
+  for (; itDel != requestQueueTable.end(); ++itDel) {
+    MQMessageQueue mqtemp = itDel->first;
     if (mqtemp.getTopic().compare(topic) == 0) {
-      if (mqsSelf.empty() || (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == 
mqsSelf.end())) {
-        if (!(it->second->isDroped())) {
-          it->second->setDroped(true);
-          // delete the lastest pull request for this mq, which hasn't been 
response
-          // m_pClientFactory->removeDropedPullRequestOpaque(it->second);
-          removeUnnecessaryMessageQueue(mqtemp);
-          it->second->clearAllMsgs();  // add clear operation to avoid bad 
state
-                                       // when dropped pullRequest returns
-                                       // normal
-          LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
-        }
+      if (mqsSelf.empty() || (std::find(mqsSelf.begin(), mqsSelf.end(), 
mqtemp) == mqsSelf.end())) {
+        // if not response , set to dropped
+        LOG_INFO("Drop mq:%s,because not responsive", 
mqtemp.toString().c_str());
+        itDel->second->setDropped(true);
+        // remove offset table to avoid offset backup
+        removeUnnecessaryMessageQueue(mqtemp);
+        itDel->second->clearAllMsgs();
+        removePullRequest(mqtemp);
+        changed = true;
+      } else if (itDel->second->isPullRequestExpired()) {
+        // if pull expired , set to dropped, eg: if add pull task error, the 
pull request will be expired.
+        LOG_INFO("Drop mq:%s according Pull timeout.", 
mqtemp.toString().c_str());
+        itDel->second->setDropped(true);
+        removeUnnecessaryMessageQueue(mqtemp);
+        itDel->second->clearAllMsgs();
+        removePullRequest(mqtemp);
         changed = true;
       }
     }
   }
 
-  //<!add
-  vector<PullRequest*> pullrequestAdd;
-  DefaultMQPushConsumer* pConsumer = 
static_cast<DefaultMQPushConsumer*>(m_pConsumer);
-  vector<MQMessageQueue>::iterator it2 = mqsSelf.begin();
-  for (; it2 != mqsSelf.end(); ++it2) {
-    PullRequest* pPullRequest(getPullRequest(*it2));
-    if (pPullRequest && pPullRequest->isDroped()) {
-      LOG_DEBUG(
-          "before resume the pull handle of this pullRequest, its mq is:%s, "
-          "its offset is:%lld",
-          (it2->toString()).c_str(), pPullRequest->getNextOffset());
-      pConsumer->getOffsetStore()->removeOffset(*it2);  // remove dirty offset 
which maybe update to
-                                                        // 
OffsetStore::m_offsetTable by consuming After last
-                                                        // drop
-      int64 nextOffset = computePullFromWhere(*it2);
-      if (nextOffset >= 0) {
-        /*
-          Fix issue with following scenario:
-          1. pullRequest was dropped
-          2. the pullMsgEvent was not executed by taskQueue, so the 
PullMsgEvent
-          was not stop
-          3. pullReuest was resumed by next doRebalance, then mulitple
-          pullMsgEvent were produced for pullRequest
-        */
-        bool bPullMsgEvent = pPullRequest->addPullMsgEvent();
-        while (!bPullMsgEvent) {
-          boost::this_thread::sleep_for(boost::chrono::milliseconds(50));
-          LOG_INFO("pullRequest with mq :%s has unfinished pullMsgEvent", 
(it2->toString()).c_str());
-          bPullMsgEvent = pPullRequest->addPullMsgEvent();
-        }
-        pPullRequest->setDroped(false);
-        pPullRequest->clearAllMsgs();  // avoid consume accumulation and 
consume
-                                       // dumplication issues
-        pPullRequest->setNextOffset(nextOffset);
-        pPullRequest->updateQueueMaxOffset(nextOffset);
-        LOG_INFO(
-            "after resume the pull handle of this pullRequest, its mq is:%s, "
-            "its offset is:%lld",
-            (it2->toString()).c_str(), pPullRequest->getNextOffset());
-        changed = true;
-        pConsumer->producePullMsgTask(pPullRequest);
-      } else {
-        LOG_ERROR("get fatel error QueryOffset of mq:%s, do not reconsume this 
queue", (it2->toString()).c_str());
-      }
+  //<!add check new mq added.
+  vector<boost::shared_ptr<PullRequest>> pullRequestsToAdd;
+  vector<MQMessageQueue>::iterator itAdd = mqsSelf.begin();
+  for (; itAdd != mqsSelf.end(); ++itAdd) {
+    if (isPullRequestExist(*itAdd)) {
+      // have check the expired pull request, re-add it.
+      continue;
     }
-
-    if (!pPullRequest) {
-      LOG_INFO("updateRequestTableInRebalance Doesn't find old mq");
-      PullRequest* pullRequest = new PullRequest(m_pConsumer->getGroupName());
-      pullRequest->m_messageQueue = *it2;
-
-      int64 nextOffset = computePullFromWhere(*it2);
-      if (nextOffset >= 0) {
-        pullRequest->setNextOffset(nextOffset);
-        pullRequest->clearAllMsgs();  // avoid consume accumulation and consume
-                                      // dumplication issues
-        changed = true;
-        //<! mq-> pq;
-        addPullRequest(*it2, pullRequest);
-        pullrequestAdd.push_back(pullRequest);
-        LOG_INFO("add mq:%s, request initiall offset:%lld", 
(*it2).toString().c_str(), nextOffset);
-      }
+    boost::shared_ptr<PullRequest> pullRequest = 
boost::make_shared<PullRequest>(m_pConsumer->getGroupName());
+    pullRequest->m_messageQueue = *itAdd;
+    int64 nextOffset = computePullFromWhere(*itAdd);
+    if (nextOffset >= 0) {
+      pullRequest->setNextOffset(nextOffset);
+      changed = true;
+      addPullRequest(*itAdd, pullRequest);
+      pullRequestsToAdd.push_back(pullRequest);
+      LOG_INFO("Add mq:%s, request initial offset:%ld", 
(*itAdd).toString().c_str(), nextOffset);
+    } else {
+      LOG_WARN(
+          "Failed to add pull request for %s due to failure of querying 
consume offset, request initial offset:%ld",
+          (*itAdd).toString().c_str(), nextOffset);
     }
   }
 
-  vector<PullRequest*>::iterator it3 = pullrequestAdd.begin();
-  for (; it3 != pullrequestAdd.end(); ++it3) {
-    LOG_DEBUG("start pull request");
-    pConsumer->producePullMsgTask(*it3);
+  for (vector<boost::shared_ptr<PullRequest>>::iterator itAdded = 
pullRequestsToAdd.begin();
+       itAdded != pullRequestsToAdd.end(); ++itAdded) {
+    LOG_INFO("Start to pull %s, offset:%ld, GroupName %s", 
(*itAdded)->m_messageQueue.toString().c_str(),
+             (*itAdded)->getNextOffset(), (*itAdded)->getGroupName().c_str());
+    if (!m_pConsumer->producePullMsgTask(*itAdded)) {
+      LOG_WARN(
+          "Failed to producer pull message task for %s, Remove it from Request 
table and wait for next #Rebalance.",
+          (*itAdded)->m_messageQueue.toString().c_str());
+      // remove from request table, and wait for next rebalance.
+      (*itAdded)->setDropped(true);
+      removePullRequest((*itAdded)->m_messageQueue);
+    }
   }
 
-  LOG_DEBUG("updateRequestTableInRebalance exit");
+  LOG_DEBUG("updateRequestTableInRebalance Topic[%s] exit", topic.c_str());
   return changed;
 }
 
 int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
   int64 result = -1;
-  DefaultMQPushConsumer* pConsumer = 
static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  DefaultMQPushConsumer* pConsumer = 
dynamic_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  if (!pConsumer) {
+    LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed 
when computePullFromWhere %s",
+              mq.toString().c_str());
+    return result;
+  }
   ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
   OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
   switch (consumeFromWhere) {
@@ -623,7 +638,12 @@ void RebalancePush::messageQueueChanged(const string& 
topic,
                                         vector<MQMessageQueue>& mqDivided) {}
 
 void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
-  DefaultMQPushConsumer* pConsumer = 
static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  // DefaultMQPushConsumer *pConsumer = static_cast<DefaultMQPushConsumer 
*>(m_pConsumer);
+  DefaultMQPushConsumer* pConsumer = 
dynamic_cast<DefaultMQPushConsumer*>(m_pConsumer);
+  if (!pConsumer) {
+    LOG_ERROR("Cast  MQConsumer* to DefaultMQPushConsumer* failed when remove 
%s", mq.toString().c_str());
+    return;
+  }
   OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
 
   pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials());
diff --git a/src/consumer/Rebalance.h b/src/consumer/Rebalance.h
index 8d8a9ae..fe39569 100644
--- a/src/consumer/Rebalance.h
+++ b/src/consumer/Rebalance.h
@@ -24,14 +24,17 @@
 #include "PullRequest.h"
 #include "SubscriptionData.h"
 
+#include <boost/smart_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 
 namespace rocketmq {
 class MQClientFactory;
+
 //<!************************************************************************
 class Rebalance {
  public:
   Rebalance(MQConsumer*, MQClientFactory*);
+
   virtual ~Rebalance();
 
   virtual void messageQueueChanged(const string& topic,
@@ -46,24 +49,36 @@ class Rebalance {
 
  public:
   void doRebalance();
+
   void persistConsumerOffset();
+
   void persistConsumerOffsetByResetOffset();
+
   //<!m_subscriptionInner;
   SubscriptionData* getSubscriptionData(const string& topic);
+
   void setSubscriptionData(const string& topic, SubscriptionData* pdata);
 
   map<string, SubscriptionData*>& getSubscriptionInner();
 
   //<!m_topicSubscribeInfoTable;
   void setTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
+
   bool getTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
 
-  void addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest);
-  PullRequest* getPullRequest(MQMessageQueue mq);
-  map<MQMessageQueue, PullRequest*> getPullRequestTable();
+  void addPullRequest(MQMessageQueue mq, boost::shared_ptr<PullRequest> 
pPullRequest);
+  void removePullRequest(MQMessageQueue mq);
+  bool isPullRequestExist(MQMessageQueue mq);
+  boost::weak_ptr<PullRequest> getPullRequest(MQMessageQueue mq);
+
+  map<MQMessageQueue, boost::shared_ptr<PullRequest>> getPullRequestTable();
+
   void lockAll();
+
   bool lock(MQMessageQueue mq);
-  void unlockAll(bool oneway = false);
+
+  void unlockAll(bool oneWay = false);
+
   void unlock(MQMessageQueue mq);
 
  protected:
@@ -71,7 +86,7 @@ class Rebalance {
 
   boost::mutex m_topicSubscribeInfoTableMutex;
   map<string, vector<MQMessageQueue>> m_topicSubscribeInfoTable;
-  typedef map<MQMessageQueue, PullRequest*> MQ2PULLREQ;
+  typedef map<MQMessageQueue, boost::shared_ptr<PullRequest>> MQ2PULLREQ;
   MQ2PULLREQ m_requestQueueTable;
   boost::mutex m_requestTableMutex;
 
@@ -84,6 +99,7 @@ class Rebalance {
 class RebalancePull : public Rebalance {
  public:
   RebalancePull(MQConsumer*, MQClientFactory*);
+
   virtual ~RebalancePull(){};
 
   virtual void messageQueueChanged(const string& topic,
@@ -101,6 +117,7 @@ class RebalancePull : public Rebalance {
 class RebalancePush : public Rebalance {
  public:
   RebalancePush(MQConsumer*, MQClientFactory*);
+
   virtual ~RebalancePush(){};
 
   virtual void messageQueueChanged(const string& topic,
@@ -115,6 +132,6 @@ class RebalancePush : public Rebalance {
 };
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/consumer/SubscriptionData.cpp 
b/src/consumer/SubscriptionData.cpp
index e771b52..433fce1 100644
--- a/src/consumer/SubscriptionData.cpp
+++ b/src/consumer/SubscriptionData.cpp
@@ -18,8 +18,8 @@
 #include <algorithm>
 #include <sstream>
 #include <vector>
-#include "UtilAll.h"
 #include "Logging.h"
+#include "UtilAll.h"
 namespace rocketmq {
 //<!************************************************************************
 SubscriptionData::SubscriptionData() {
@@ -127,4 +127,4 @@ Json::Value SubscriptionData::toJson() const {
 }
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/consumer/SubscriptionData.h b/src/consumer/SubscriptionData.h
index 89be74f..40e985b 100644
--- a/src/consumer/SubscriptionData.h
+++ b/src/consumer/SubscriptionData.h
@@ -57,6 +57,6 @@ class SubscriptionData {
   vector<int> m_codeSet;
 };
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif

Reply via email to