This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 5629226  [ISSUE #293] reset pull call back by new pull request event 
time
5629226 is described below

commit 5629226997fea8bc6ac91e9481d2140d983106a7
Author: dinglei <[email protected]>
AuthorDate: Fri Apr 17 10:23:47 2020 +0800

    [ISSUE #293] reset pull call back by new pull request event time
    
    [ISSUE #293] reset pull call back by new pull request event time
---
 src/consumer/DefaultMQPushConsumerImpl.cpp | 3 ++-
 src/consumer/PullRequest.cpp               | 3 ++-
 src/consumer/Rebalance.cpp                 | 1 +
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp 
b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 12fddd3..39b3ccd 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -790,7 +790,8 @@ AsyncPullCallback* 
DefaultMQPushConsumerImpl::getAsyncPullCallBack(boost::weak_p
       m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
     }
     AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue];
-    if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) {
+    if (asyncPullCallback) {
+      // maybe the pull request has dropped before, replace event time.
       asyncPullCallback->setPullRequest(pullRequest);
     }
     return asyncPullCallback;
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index e578840..9795880 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -208,7 +208,8 @@ bool PullRequest::isPullRequestExpired() const {
   uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
   if (interval <= UtilAll::currentTimeMillis()) {
     LOG_WARN("PullRequest for [%s] has been expired %lld 
ms,m_lastPullTimestamp = %lld ms",
-             m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - 
interval, m_lastPullTimestamp);
+             m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - 
m_lastPullTimestamp,
+             m_lastPullTimestamp);
     return true;
   }
   return false;
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 6dbb35c..18c8b2f 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -522,6 +522,7 @@ bool RebalancePush::updateRequestTableInRebalance(const 
string& topic, vector<MQ
     int64 nextOffset = computePullFromWhere(*itAdd);
     if (nextOffset >= 0) {
       pullRequest->setNextOffset(nextOffset);
+      pullRequest->setDropped(false);
       changed = true;
       addPullRequest(*itAdd, pullRequest);
       pullRequestsToAdd.push_back(pullRequest);

Reply via email to