This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new bc5adc5 refactor(rebalance): use smart_ptr to manage pull request
(#206)
bc5adc5 is described below
commit bc5adc57eda7145486b0a8851f65daab2530d9bb
Author: dinglei <[email protected]>
AuthorDate: Wed Dec 11 15:45:26 2019 +0800
refactor(rebalance): use smart_ptr to manage pull request (#206)
refactor(rebalance): use smart_ptr to manage pull request
---
include/DefaultMQPullConsumer.h | 15 +-
include/DefaultMQPushConsumer.h | 14 +-
include/MQConsumer.h | 2 +-
src/MQClientFactory.cpp | 8 +-
src/consumer/AllocateMQStrategy.h | 2 +-
src/consumer/ConsumeMessageConcurrentlyService.cpp | 33 +-
src/consumer/ConsumeMessageOrderlyService.cpp | 33 +-
src/consumer/ConsumeMsgService.h | 16 +-
src/consumer/DefaultMQPullConsumer.cpp | 6 +-
src/consumer/DefaultMQPushConsumer.cpp | 458 ++++++++++++---------
src/consumer/FindBrokerResult.h | 2 +-
src/consumer/OffsetStore.cpp | 4 +-
src/consumer/OffsetStore.h | 2 +-
src/consumer/PullAPIWrapper.cpp | 2 +-
src/consumer/PullAPIWrapper.h | 2 +-
src/consumer/PullRequest.cpp | 58 +--
src/consumer/PullRequest.h | 17 +-
src/consumer/PullResult.cpp | 2 +-
src/consumer/PullResultExt.h | 2 +-
src/consumer/Rebalance.cpp | 298 +++++++-------
src/consumer/Rebalance.h | 29 +-
src/consumer/SubscriptionData.cpp | 4 +-
src/consumer/SubscriptionData.h | 2 +-
23 files changed, 588 insertions(+), 423 deletions(-)
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 4f6ef92..af01941 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -54,7 +54,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public
MQConsumer {
virtual void getSubscriptions(std::vector<SubscriptionData>&);
virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset);
virtual void removeConsumeOffset(const MQMessageQueue& mq);
- virtual void producePullMsgTask(PullRequest*);
+ virtual bool producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest);
virtual Rebalance* getRebalance() const;
//<!end MQConsumer;
@@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public
MQConsumer {
* @param subExpression
* set filter expression for pulled msg, broker will filter msg
actively
* Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
- * if subExpression is setted to "null" or "*"��all msg will be
subscribed
+ * if subExpression is setted to "null" or "*", all msg will be
subscribed
* @param offset
* specify the started pull offset
* @param maxNums
@@ -90,7 +90,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public
MQConsumer {
* @param subExpression
* set filter expression for pulled msg, broker will filter msg
actively
* Now only OR operation is supported, eg: "tag1 || tag2 || tag3"
- * if subExpression is setted to "null" or "*"��all msg will be
subscribed
+ * if subExpression is setted to "null" or "*", all msg will be
subscribed
* @param offset
* specify the started pull offset
* @param maxNums
@@ -107,20 +107,13 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public
MQConsumer {
virtual ConsumerRunningInfo* getConsumerRunningInfo() { return NULL; }
/**
- * ��ȡ���ѽ��ȣ�����-1��ʾ����
*
* @param mq
* @param fromStore
* @return
*/
int64 fetchConsumeOffset(const MQMessageQueue& mq, bool fromStore);
- /**
- * ����topic��ȡMessageQueue���Ծ��ⷽʽ�����ڶ����Ա֮�����
- *
- * @param topic
- * ��ϢTopic
- * @return ���ض��м���
- */
+
void fetchMessageQueuesInBalance(const std::string& topic,
std::vector<MQMessageQueue> mqs);
// temp persist consumer offset interface, only valid with
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 61fcde1..b6de085 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -86,13 +86,17 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public
MQConsumer {
virtual Rebalance* getRebalance() const;
ConsumeMsgService* getConsumerMsgService() const;
- virtual void producePullMsgTask(PullRequest*);
- void triggerNextPullRequest(boost::asio::deadline_timer* t, PullRequest*
request);
+ virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>);
+ virtual bool producePullMsgTaskLater(boost::weak_ptr<PullRequest>, int
millis);
+ static void static_triggerNextPullRequest(void* context,
+ boost::asio::deadline_timer* t,
+ boost::weak_ptr<PullRequest>);
+ void triggerNextPullRequest(boost::asio::deadline_timer* t,
boost::weak_ptr<PullRequest>);
void runPullMsgQueue(TaskQueue* pTaskQueue);
- void pullMessage(PullRequest* pullrequest); // sync pullMsg
- void pullMessageAsync(PullRequest* pullrequest); // async pullMsg
+ void pullMessage(boost::weak_ptr<PullRequest> pullrequest); // sync
pullMsg
+ void pullMessageAsync(boost::weak_ptr<PullRequest> pullrequest); // async
pullMsg
void setAsyncPull(bool asyncFlag);
- AsyncPullCallback* getAsyncPullCallBack(PullRequest* request, MQMessageQueue
msgQueue);
+ AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr<PullRequest>,
MQMessageQueue msgQueue);
void shutdownAsyncPullCallBack();
/*
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index cc25327..b6cd613 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient {
virtual ConsumeType getConsumeType() = 0;
virtual ConsumeFromWhere getConsumeFromWhere() = 0;
virtual void getSubscriptions(std::vector<SubscriptionData>&) = 0;
- virtual void producePullMsgTask(PullRequest*) = 0;
+ virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>) = 0;
virtual Rebalance* getRebalance() const = 0;
virtual PullResult pull(const MQMessageQueue& mq, const std::string&
subExpression, int64 offset, int maxNums) = 0;
virtual void pull(const MQMessageQueue& mq,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 99c4ffd..03d4640 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1040,10 +1040,12 @@ void MQClientFactory::resetOffset(const string& group,
for (; it != offsetTable.end(); ++it) {
MQMessageQueue mq = it->first;
- PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
+ boost::weak_ptr<PullRequest> pullRequest =
pConsumer->getRebalance()->getPullRequest(mq);
+ boost::shared_ptr<PullRequest> pullreq = pullRequest.lock();
+ // PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
- pullreq->setDroped(true);
- LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
+ pullreq->setDropped(true);
+ LOG_INFO("resetOffset setDropped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
diff --git a/src/consumer/AllocateMQStrategy.h
b/src/consumer/AllocateMQStrategy.h
index 4613040..e24966c 100644
--- a/src/consumer/AllocateMQStrategy.h
+++ b/src/consumer/AllocateMQStrategy.h
@@ -92,5 +92,5 @@ class AllocateMQAveragely : public AllocateMQStrategy {
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp
b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 9069350..e5df16e 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -60,19 +60,35 @@ MessageListenerType
ConsumeMessageConcurrentlyService::getConsumeMsgSerivceListe
return m_pMessageListener->getMessageListenerType();
}
-void ConsumeMessageConcurrentlyService::submitConsumeRequest(PullRequest*
request, vector<MQMessageExt>& msgs) {
+void
ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<PullRequest>
pullRequest,
+
vector<MQMessageExt>& msgs) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
+ if (request->isDropped()) {
+ LOG_INFO("Pull request for %s is dropped, which will be released in next
re-balance.",
+ request->m_messageQueue.toString().c_str());
+ return;
+ }
m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest,
this, request, msgs));
}
-void ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request,
vector<MQMessageExt>& msgs) {
- if (!request || request->isDroped()) {
- LOG_WARN("the pull result is NULL or Had been dropped");
+void
ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
pullRequest,
+ vector<MQMessageExt>&
msgs) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
+ if (!request || request->isDropped()) {
+ LOG_WARN("the pull request had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
return;
}
- //<!��ȡ����;
if (msgs.empty()) {
LOG_WARN("the msg of pull result is NULL,its mq:%s",
(request->m_messageQueue).toString().c_str());
return;
@@ -123,12 +139,11 @@ void
ConsumeMessageConcurrentlyService::ConsumeRequest(PullRequest* request, vec
// update offset
int64 offset = request->removeMessage(msgs);
- // LOG_DEBUG("update offset:%lld of mq: %s",
- // offset,(request->m_messageQueue).toString().c_str());
if (offset >= 0) {
m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
} else {
- LOG_WARN("Note: accumulation consume occurs on mq:%s",
(request->m_messageQueue).toString().c_str());
+ LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated
before. skip..",
+ (request->m_messageQueue).toString().c_str());
}
}
@@ -144,4 +159,4 @@ void
ConsumeMessageConcurrentlyService::resetRetryTopic(vector<MQMessageExt>& ms
}
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp
b/src/consumer/ConsumeMessageOrderlyService.cpp
index 800bb4d..f68fe44 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -103,14 +103,25 @@ MessageListenerType
ConsumeMessageOrderlyService::getConsumeMsgSerivceListenerTy
return m_pMessageListener->getMessageListenerType();
}
-void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request,
vector<MQMessageExt>& msgs) {
+void
ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr<PullRequest>
pullRequest,
+ vector<MQMessageExt>&
msgs) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest,
this, request));
}
void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void*
context,
-
PullRequest* request,
+
boost::weak_ptr<PullRequest> pullRequest,
bool
tryLockMQ,
boost::asio::deadline_timer* t) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
LOG_INFO("submit consumeRequest later for mq:%s",
request->m_messageQueue.toString().c_str());
vector<MQMessageExt> msgs;
ConsumeMessageOrderlyService* orderlyService =
(ConsumeMessageOrderlyService*)context;
@@ -122,7 +133,12 @@ void
ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex
deleteAndZero(t);
}
-void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
+void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
pullRequest) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
bool bGetMutex = false;
boost::unique_lock<boost::timed_mutex>
lock(request->getPullRequestCriticalSection(), boost::try_to_lock);
if (!lock.owns_lock()) {
@@ -140,7 +156,7 @@ void
ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
// request->m_messageQueue.toString().c_str());
return;
}
- if (!request || request->isDroped()) {
+ if (!request || request->isDropped()) {
LOG_WARN("the pull result is NULL or Had been dropped");
request->clearAllMsgs(); // add clear operation to avoid bad state when
// dropped pullRequest returns normal
@@ -189,11 +205,16 @@ void
ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) {
}
}
}
-void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest*
request, bool tryLockMQ) {
+void
ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest>
pullRequest, bool tryLockMQ) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released");
+ return;
+ }
int retryTimer = tryLockMQ ? 500 : 100;
boost::asio::deadline_timer* t =
new boost::asio::deadline_timer(m_async_ioService,
boost::posix_time::milliseconds(retryTimer));
t->async_wait(
boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater),
this, request, tryLockMQ, t));
}
-}
+} // namespace rocketmq
diff --git a/src/consumer/ConsumeMsgService.h b/src/consumer/ConsumeMsgService.h
index 2bb7979..a5d3ce7 100644
--- a/src/consumer/ConsumeMsgService.h
+++ b/src/consumer/ConsumeMsgService.h
@@ -38,7 +38,7 @@ class ConsumeMsgService {
virtual void start() {}
virtual void shutdown() {}
virtual void stopThreadPool() {}
- virtual void submitConsumeRequest(PullRequest* request,
vector<MQMessageExt>& msgs) {}
+ virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request,
vector<MQMessageExt>& msgs) {}
virtual MessageListenerType getConsumeMsgSerivceListenerType() { return
messageListenerDefaultly; }
};
@@ -48,11 +48,11 @@ class ConsumeMessageConcurrentlyService : public
ConsumeMsgService {
virtual ~ConsumeMessageConcurrentlyService();
virtual void start();
virtual void shutdown();
- virtual void submitConsumeRequest(PullRequest* request,
vector<MQMessageExt>& msgs);
+ virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request,
vector<MQMessageExt>& msgs);
virtual MessageListenerType getConsumeMsgSerivceListenerType();
virtual void stopThreadPool();
- void ConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs);
+ void ConsumeRequest(boost::weak_ptr<PullRequest> request,
vector<MQMessageExt>& msgs);
private:
void resetRetryTopic(vector<MQMessageExt>& msgs);
@@ -71,17 +71,17 @@ class ConsumeMessageOrderlyService : public
ConsumeMsgService {
virtual ~ConsumeMessageOrderlyService();
virtual void start();
virtual void shutdown();
- virtual void submitConsumeRequest(PullRequest* request,
vector<MQMessageExt>& msgs);
+ virtual void submitConsumeRequest(boost::weak_ptr<PullRequest> request,
vector<MQMessageExt>& msgs);
virtual void stopThreadPool();
virtual MessageListenerType getConsumeMsgSerivceListenerType();
void boost_asio_work();
- void tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ);
+ void tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> request, bool
tryLockMQ);
static void static_submitConsumeRequestLater(void* context,
- PullRequest* request,
+ boost::weak_ptr<PullRequest>
request,
bool tryLockMQ,
boost::asio::deadline_timer* t);
- void ConsumeRequest(PullRequest* request);
+ void ConsumeRequest(boost::weak_ptr<PullRequest> request);
void lockMQPeriodically(boost::system::error_code& ec,
boost::asio::deadline_timer* t);
void unlockAllMQ();
bool lockOneMQ(const MQMessageQueue& mq);
@@ -99,6 +99,6 @@ class ConsumeMessageOrderlyService : public ConsumeMsgService
{
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif //<! _CONSUMEMESSAGESERVICE_H_
diff --git a/src/consumer/DefaultMQPullConsumer.cpp
b/src/consumer/DefaultMQPullConsumer.cpp
index 2175feb..d3ce978 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -369,11 +369,13 @@ void
DefaultMQPullConsumer::getSubscriptions(vector<SubscriptionData>& result) {
}
}
-void DefaultMQPullConsumer::producePullMsgTask(PullRequest*) {}
+bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest>
pullRequest) {
+ return true;
+}
Rebalance* DefaultMQPullConsumer::getRebalance() const {
return NULL;
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumer.cpp
b/src/consumer/DefaultMQPushConsumer.cpp
index 7e47e15..0cd5427 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -23,8 +23,6 @@
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientFactory.h"
-#include "MQClientManager.h"
-#include "MQProtos.h"
#include "OffsetStore.h"
#include "PullAPIWrapper.h"
#include "PullSysFlag.h"
@@ -37,150 +35,157 @@ namespace rocketmq {
class AsyncPullCallback : public PullCallback {
public:
- AsyncPullCallback(DefaultMQPushConsumer* pushConsumer, PullRequest* request)
+ AsyncPullCallback(DefaultMQPushConsumer* pushConsumer,
boost::weak_ptr<PullRequest> request)
: m_callbackOwner(pushConsumer), m_pullRequest(request),
m_bShutdown(false) {}
- virtual ~AsyncPullCallback() {
- m_callbackOwner = NULL;
- m_pullRequest = NULL;
- }
+ virtual ~AsyncPullCallback() { m_callbackOwner = NULL; }
virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool
bProducePullRequest) {
- if (m_bShutdown == true) {
- LOG_INFO("pullrequest for:%s in shutdown, return",
(m_pullRequest->m_messageQueue).toString().c_str());
- m_pullRequest->removePullMsgEvent();
+ boost::shared_ptr<PullRequest> pullRequest = m_pullRequest.lock();
+ if (!pullRequest) {
+ LOG_WARN("Pull request for[%s] has been released",
mq.toString().c_str());
return;
}
+ if (m_bShutdown) {
+ LOG_INFO("pullrequest for:%s in shutdown, return",
(pullRequest->m_messageQueue).toString().c_str());
+ return;
+ }
+ if (pullRequest->isDropped()) {
+ LOG_INFO("Pull request for queue[%s] has been set as dropped. Will NOT
pull this queue any more",
+ pullRequest->m_messageQueue.toString().c_str());
+ return;
+ }
switch (result.pullStatus) {
case FOUND: {
- if (!m_pullRequest->isDroped()) // if request is setted to dropped,
- // don't add msgFoundList to
- // m_msgTreeMap and don't call
- // producePullMsgTask
- { // avoid issue: pullMsg is sent out,
rebalance is doing concurrently
- // and this request is dropped, and then received pulled msgs.
- m_pullRequest->setNextOffset(result.nextBeginOffset);
- m_pullRequest->putMessage(result.msgFoundList);
-
-
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(m_pullRequest,
result.msgFoundList);
-
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
-
- LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
result.msgFoundList.size(),
- result.nextBeginOffset);
+ if (pullRequest->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
+ break;
+ }
+ pullRequest->setNextOffset(result.nextBeginOffset);
+ pullRequest->putMessage(result.msgFoundList);
+
+
m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest,
result.msgFoundList);
+
+ if (bProducePullRequest) {
+ m_callbackOwner->producePullMsgTask(pullRequest);
} else {
- LOG_INFO("remove pullmsg event of mq:%s",
(m_pullRequest->m_messageQueue).toString().c_str());
- m_pullRequest->removePullMsgEvent();
+ LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+ (pullRequest->m_messageQueue).toString().c_str());
}
+
+ LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
+ (pullRequest->m_messageQueue).toString().c_str(),
result.msgFoundList.size(), result.nextBeginOffset);
+
break;
}
case NO_NEW_MSG: {
- m_pullRequest->setNextOffset(result.nextBeginOffset);
+ if (pullRequest->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
+ break;
+ }
+ pullRequest->setNextOffset(result.nextBeginOffset);
vector<MQMessageExt> msgs;
- m_pullRequest->getMessage(msgs);
+ pullRequest->getMessage(msgs);
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
- /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
- is kept, then consumer will enter following situation:
- 1>. get pull offset with 0 when do rebalance, and set
- m_offsetTable[mq] to 0;
- 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
- offset increase by 800
- 3>. request->getMessage(msgs) always NULL
- 4>. we need update consumerOffset to nextBeginOffset indicated by
- broker
- but if really no new msg could be pulled, also go to this CASE
-
- LOG_INFO("maybe misMatch between broker and client happens, update
- consumerOffset to nextBeginOffset indicated by broker");*/
- m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
result.nextBeginOffset);
+ m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue,
result.nextBeginOffset);
}
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
-
- /*LOG_INFO("NO_NEW_MSG:%s,nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.nextBeginOffset);*/
+ if (bProducePullRequest) {
+ m_callbackOwner->producePullMsgTask(pullRequest);
+ } else {
+ LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+ (pullRequest->m_messageQueue).toString().c_str());
+ }
+ LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld",
pullRequest->m_messageQueue.toString().c_str(),
+ result.nextBeginOffset);
break;
}
case NO_MATCHED_MSG: {
- m_pullRequest->setNextOffset(result.nextBeginOffset);
+ if (pullRequest->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
+ break;
+ }
+ pullRequest->setNextOffset(result.nextBeginOffset);
vector<MQMessageExt> msgs;
- m_pullRequest->getMessage(msgs);
+ pullRequest->getMessage(msgs);
if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
- /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
- is kept, then consumer will enter following situation:
- 1>. get pull offset with 0 when do rebalance, and set
- m_offsetTable[mq] to 0;
- 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
- offset increase by 800
- 3>. request->getMessage(msgs) always NULL
- 4>. we need update consumerOffset to nextBeginOffset indicated by
- broker
- but if really no new msg could be pulled, also go to this CASE
-
- LOG_INFO("maybe misMatch between broker and client happens, update
- consumerOffset to nextBeginOffset indicated by broker");*/
- m_callbackOwner->updateConsumeOffset(m_pullRequest->m_messageQueue,
result.nextBeginOffset);
+ m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue,
result.nextBeginOffset);
+ }
+ if (bProducePullRequest) {
+ m_callbackOwner->producePullMsgTask(pullRequest);
+ } else {
+ LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+ (pullRequest->m_messageQueue).toString().c_str());
}
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
- /*LOG_INFO("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.nextBeginOffset);*/
+ LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld",
pullRequest->m_messageQueue.toString().c_str(),
+ result.nextBeginOffset);
break;
}
case OFFSET_ILLEGAL: {
- m_pullRequest->setNextOffset(result.nextBeginOffset);
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
-
- /*LOG_INFO("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
- (m_pullRequest->m_messageQueue).toString().c_str(),
- result.nextBeginOffset);*/
+ if (pullRequest->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
+ break;
+ }
+ pullRequest->setNextOffset(result.nextBeginOffset);
+ if (bProducePullRequest) {
+ m_callbackOwner->producePullMsgTask(pullRequest);
+ } else {
+ LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+ (pullRequest->m_messageQueue).toString().c_str());
+ }
+
+ LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld",
pullRequest->m_messageQueue.toString().c_str(),
+ result.nextBeginOffset);
break;
}
- case BROKER_TIMEOUT: { // as BROKER_TIMEOUT is defined by client, broker
- // will not returns this status, so this case
- // could not be entered.
+ case BROKER_TIMEOUT: {
+ if (pullRequest->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
(pullRequest->m_messageQueue).toString().c_str());
+ break;
+ }
LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
- m_pullRequest->setNextOffset(result.nextBeginOffset);
- if (bProducePullRequest)
- m_callbackOwner->producePullMsgTask(m_pullRequest);
- else
- m_pullRequest->removePullMsgEvent();
+ pullRequest->setNextOffset(result.nextBeginOffset);
+ if (bProducePullRequest) {
+ m_callbackOwner->producePullMsgTask(pullRequest);
+ } else {
+ LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
+ (pullRequest->m_messageQueue).toString().c_str());
+ }
break;
}
}
}
virtual void onException(MQException& e) {
- if (m_bShutdown == true) {
- LOG_INFO("pullrequest for:%s in shutdown, return",
(m_pullRequest->m_messageQueue).toString().c_str());
- m_pullRequest->removePullMsgEvent();
+ boost::shared_ptr<PullRequest> pullRequest = m_pullRequest.lock();
+ if (!pullRequest) {
+ LOG_WARN("Pull request has been released.");
return;
}
- LOG_WARN("pullrequest for:%s occurs exception, reproduce it",
(m_pullRequest->m_messageQueue).toString().c_str());
- m_callbackOwner->producePullMsgTask(m_pullRequest);
+ std::string queueName = pullRequest->m_messageQueue.toString();
+ if (m_bShutdown) {
+ LOG_INFO("pullrequest for:%s in shutdown, return", queueName.c_str());
+ return;
+ }
+ if (pullRequest->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", queueName.c_str());
+ return;
+ }
+ LOG_WARN("Pullrequest for:%s occurs exception, reproduce it after 1s.",
queueName.c_str());
+ m_callbackOwner->producePullMsgTaskLater(pullRequest, 1000);
}
void setShutdownStatus() { m_bShutdown = true; }
+ const boost::weak_ptr<PullRequest>& getPullRequest() const { return
m_pullRequest; }
+
+ void setPullRequest(boost::weak_ptr<PullRequest>& pullRequest) {
m_pullRequest = pullRequest; }
+
private:
DefaultMQPushConsumer* m_callbackOwner;
- PullRequest* m_pullRequest;
+ boost::weak_ptr<PullRequest> m_pullRequest;
bool m_bShutdown;
};
@@ -524,38 +529,84 @@ void DefaultMQPushConsumer::removeConsumeOffset(const
MQMessageQueue& mq) {
m_pOffsetStore->removeOffset(mq);
}
-void
DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t,
PullRequest* request) {
- // LOG_INFO("trigger pullrequest for:%s",
- // (request->m_messageQueue).toString().c_str());
- producePullMsgTask(request);
+void DefaultMQPushConsumer::static_triggerNextPullRequest(void* context,
+
boost::asio::deadline_timer* t,
+
boost::weak_ptr<PullRequest> pullRequest) {
+ if (pullRequest.expired()) {
+ LOG_WARN("Pull request has been released before.");
+ return;
+ }
+ DefaultMQPushConsumer* pDefaultMQPushConsumer =
(DefaultMQPushConsumer*)context;
+ if (pDefaultMQPushConsumer) {
+ pDefaultMQPushConsumer->triggerNextPullRequest(t, pullRequest);
+ }
+}
+
+void
DefaultMQPushConsumer::triggerNextPullRequest(boost::asio::deadline_timer* t,
+
boost::weak_ptr<PullRequest> pullRequest) {
+ // delete first to avoild memleak
deleteAndZero(t);
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released before.");
+ return;
+ }
+ producePullMsgTask(request);
}
-void DefaultMQPushConsumer::producePullMsgTask(PullRequest* request) {
+bool
DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest>
pullRequest, int millis) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_INFO("Pull request is invalid. Maybe it is dropped before.");
+ return false;
+ }
+ if (request->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
request->m_messageQueue.toString().c_str());
+ return false;
+ }
+ boost::asio::deadline_timer* t =
+ new boost::asio::deadline_timer(m_async_ioService,
boost::posix_time::milliseconds(millis));
+
t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest),
this, t, request));
+ LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.",
(request->m_messageQueue).toString().c_str(), millis);
+ return true;
+}
+
+bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest>
pullRequest) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_WARN("Pull request has been released.");
+ return false;
+ }
+ if (request->isDropped()) {
+ LOG_INFO("[Dropped]Remove pullmsg event of mq:%s",
request->m_messageQueue.toString().c_str());
+ return false;
+ }
if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) {
- request->addPullMsgEvent();
if (m_asyncPull) {
m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessageAsync,
this, request));
} else {
m_pullmsgQueue->produce(TaskBinder::gen(&DefaultMQPushConsumer::pullMessage,
this, request));
}
} else {
- LOG_WARN("produce pullmsg of mq:%s failed",
request->m_messageQueue.toString().c_str());
+ LOG_WARN("produce PullRequest of mq:%s failed",
request->m_messageQueue.toString().c_str());
+ return false;
}
+ return true;
}
void DefaultMQPushConsumer::runPullMsgQueue(TaskQueue* pTaskQueue) {
pTaskQueue->run();
}
-void DefaultMQPushConsumer::pullMessage(PullRequest* request) {
- if (request == NULL) {
- LOG_ERROR("Pull request is NULL, return");
+void DefaultMQPushConsumer::pullMessage(boost::weak_ptr<PullRequest>
pullRequest) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_ERROR("Pull request is released, return");
return;
}
- if (request->isDroped()) {
+ if (request->isDropped()) {
LOG_WARN("Pull request is set drop with mq:%s, return",
(request->m_messageQueue).toString().c_str());
- request->removePullMsgEvent();
+ // request->removePullMsgEvent();
return;
}
@@ -563,19 +614,19 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
if (m_consumerService->getConsumeMsgSerivceListenerType() ==
messageListenerOrderly) {
if (!request->isLocked() || request->isLockExpired()) {
if (!m_pRebalance->lock(messageQueue)) {
- producePullMsgTask(request);
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ producePullMsgTaskLater(request, 1000);
return;
}
}
}
if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
- // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
- // than:%d", (request->m_messageQueue).toString().c_str(),
- // request->getCacheMsgCount(), m_maxMsgCacheSize);
- boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_async_ioService,
boost::posix_time::milliseconds(1 * 1000));
- t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
this, t, request));
+ LOG_INFO("Sync Pull request for %s has Cached with %d Messages and The Max
size is %d, Sleep 1s.",
+ (request->m_messageQueue).toString().c_str(),
request->getCacheMsgCount(), m_maxMsgCacheSize);
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ // Retry 1s,
+ producePullMsgTaskLater(request, 1000);
return;
}
@@ -591,7 +642,9 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
string subExpression;
SubscriptionData* pSdata =
m_pRebalance->getSubscriptionData(messageQueue.getTopic());
if (pSdata == NULL) {
- producePullMsgTask(request);
+ LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep
1s.",
+ (request->m_messageQueue).toString().c_str());
+ producePullMsgTaskLater(request, 1000);
return;
}
subExpression = pSdata->getSubString();
@@ -600,7 +653,10 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
false, // suspend
!subExpression.empty(), //
subscription
false); // class
filter
-
+ if (request->isDropped()) {
+ LOG_WARN("Pull request is set as dropped with mq:%s, return",
(request->m_messageQueue).toString().c_str());
+ return;
+ }
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
unique_ptr<PullResult>
result(m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
@@ -616,45 +672,35 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
NULL,
getSessionCredentials()));
PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue,
result.get(), pSdata);
-
switch (pullResult.pullStatus) {
case FOUND: {
- if (!request->isDroped()) // if request is setted to dropped, don't
add
- // msgFoundList to m_msgTreeMap and don't
- // call producePullMsgTask
- { // avoid issue: pullMsg is sent out,
rebalance is doing concurrently
- // and this request is dropped, and then received pulled msgs.
- request->setNextOffset(pullResult.nextBeginOffset);
- request->putMessage(pullResult.msgFoundList);
-
- m_consumerService->submitConsumeRequest(request,
pullResult.msgFoundList);
- producePullMsgTask(request);
-
- LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld",
messageQueue.toString().c_str(),
- pullResult.msgFoundList.size(),
pullResult.nextBeginOffset);
- } else {
- request->removePullMsgEvent();
+ if (request->isDropped()) {
+ LOG_INFO("Get pull result but the queue has been marked as dropped.
Queue: %s",
+ messageQueue.toString().c_str());
+ break;
}
+ // and this request is dropped, and then received pulled msgs.
+ request->setNextOffset(pullResult.nextBeginOffset);
+ request->putMessage(pullResult.msgFoundList);
+
+ m_consumerService->submitConsumeRequest(request,
pullResult.msgFoundList);
+ producePullMsgTask(request);
+
+ LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld",
messageQueue.toString().c_str(),
+ pullResult.msgFoundList.size(), pullResult.nextBeginOffset);
+
break;
}
case NO_NEW_MSG: {
+ if (request->isDropped()) {
+ LOG_INFO("Get pull result but the queue has been marked as dropped.
Queue: %s",
+ messageQueue.toString().c_str());
+ break;
+ }
request->setNextOffset(pullResult.nextBeginOffset);
vector<MQMessageExt> msgs;
request->getMessage(msgs);
if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
- /*if broker losted/cleared msgs of one msgQueue, but the brokerOffset
- is kept, then consumer will enter following situation:
- 1>. get pull offset with 0 when do rebalance, and set
- m_offsetTable[mq] to 0;
- 2>. NO_NEW_MSG or NO_MATCHED_MSG got when pullMessage, and nextBegin
- offset increase by 800
- 3>. request->getMessage(msgs) always NULL
- 4>. we need update consumerOffset to nextBeginOffset indicated by
- broker
- but if really no new msg could be pulled, also go to this CASE
- */
- // LOG_DEBUG("maybe misMatch between broker and client happens,
update
- // consumerOffset to nextBeginOffset indicated by broker");
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
@@ -662,12 +708,15 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
break;
}
case NO_MATCHED_MSG: {
+ if (request->isDropped()) {
+ LOG_INFO("Get pull result but the queue has been marked as dropped.
Queue: %s",
+ messageQueue.toString().c_str());
+ break;
+ }
request->setNextOffset(pullResult.nextBeginOffset);
vector<MQMessageExt> msgs;
request->getMessage(msgs);
if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
- // LOG_DEBUG("maybe misMatch between broker and client happens,
update
- // consumerOffset to nextBeginOffset indicated by broker");
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
@@ -677,6 +726,11 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
break;
}
case OFFSET_ILLEGAL: {
+ if (request->isDropped()) {
+ LOG_INFO("Get pull result but the queue has been marked as dropped.
Queue: %s",
+ messageQueue.toString().c_str());
+ break;
+ }
request->setNextOffset(pullResult.nextBeginOffset);
producePullMsgTask(request);
@@ -695,11 +749,17 @@ void DefaultMQPushConsumer::pullMessage(PullRequest*
request) {
}
} catch (MQException& e) {
LOG_ERROR(e.what());
- producePullMsgTask(request);
+ LOG_WARN("Pull %s occur exception, restart 1s later.",
messageQueue.toString().c_str());
+ producePullMsgTaskLater(request, 1000);
}
}
-AsyncPullCallback* DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest*
request, MQMessageQueue msgQueue) {
+AsyncPullCallback*
DefaultMQPushConsumer::getAsyncPullCallBack(boost::weak_ptr<PullRequest>
pullRequest,
+ MQMessageQueue
msgQueue) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ return NULL;
+ }
boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
if (m_asyncPull && request) {
PullMAP::iterator it = m_PullCallback.find(msgQueue);
@@ -707,7 +767,11 @@ AsyncPullCallback*
DefaultMQPushConsumer::getAsyncPullCallBack(PullRequest* requ
LOG_INFO("new pull callback for mq:%s", msgQueue.toString().c_str());
m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
}
- return m_PullCallback[msgQueue];
+ AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue];
+ if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) {
+ asyncPullCallback->setPullRequest(pullRequest);
+ }
+ return asyncPullCallback;
}
return NULL;
@@ -727,34 +791,34 @@ void DefaultMQPushConsumer::shutdownAsyncPullCallBack() {
}
}
-void DefaultMQPushConsumer::pullMessageAsync(PullRequest* request) {
- if (request == NULL) {
- LOG_ERROR("Pull request is NULL, return");
+void DefaultMQPushConsumer::pullMessageAsync(boost::weak_ptr<PullRequest>
pullRequest) {
+ boost::shared_ptr<PullRequest> request = pullRequest.lock();
+ if (!request) {
+ LOG_ERROR("Pull request is released, return");
return;
}
- if (request->isDroped()) {
+ if (request->isDropped()) {
LOG_WARN("Pull request is set drop with mq:%s, return",
(request->m_messageQueue).toString().c_str());
- request->removePullMsgEvent();
return;
}
-
MQMessageQueue& messageQueue = request->m_messageQueue;
if (m_consumerService->getConsumeMsgSerivceListenerType() ==
messageListenerOrderly) {
if (!request->isLocked() || request->isLockExpired()) {
if (!m_pRebalance->lock(messageQueue)) {
- producePullMsgTask(request);
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ // Retry later.
+ producePullMsgTaskLater(request, 1000);
return;
}
}
}
if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
- // LOG_INFO("retry pullrequest for:%s after 1s, as cachMsgSize:%d is larger
- // than:%d", (request->m_messageQueue).toString().c_str(),
- // request->getCacheMsgCount(), m_maxMsgCacheSize);
- boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_async_ioService,
boost::posix_time::milliseconds(1 * 1000));
- t->async_wait(boost::bind(&DefaultMQPushConsumer::triggerNextPullRequest,
this, t, request));
+ LOG_INFO("Pull request for [%s] has Cached with %d Messages and The Max
size is %d, Sleep 3s.",
+ (request->m_messageQueue).toString().c_str(),
request->getCacheMsgCount(), m_maxMsgCacheSize);
+ request->setLastPullTimestamp(UtilAll::currentTimeMillis());
+ // Retry 3s,
+ producePullMsgTaskLater(request, 3000);
return;
}
@@ -770,7 +834,10 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest*
request) {
string subExpression;
SubscriptionData* pSdata =
(m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
if (pSdata == NULL) {
- producePullMsgTask(request);
+ LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep
1s.",
+ (request->m_messageQueue).toString().c_str());
+ // Subscribe data error, retry later.
+ producePullMsgTaskLater(request, 1000);
return;
}
subExpression = pSdata->getSubString();
@@ -784,24 +851,39 @@ void DefaultMQPushConsumer::pullMessageAsync(PullRequest*
request) {
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
+ if (request->isDropped()) {
+ LOG_WARN("Pull request is set as dropped with mq:%s, return",
request->m_messageQueue.toString().c_str());
+ return;
+ }
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
- m_pPullAPIWrapper->pullKernelImpl(messageQueue,
// 1
- subExpression,
// 2
- pSdata->getSubVersion(),
// 3
- request->getNextOffset(),
// 4
- 32,
// 5
- sysFlag,
// 6
- commitOffsetValue,
// 7
- 1000 * 15,
// 8
- m_asyncPullTimeout,
// 9
- ComMode_ASYNC,
// 10
- getAsyncPullCallBack(request,
messageQueue), // 11
- getSessionCredentials(),
// 12
- &arg);
// 13
+ AsyncPullCallback* pullCallback = getAsyncPullCallBack(request,
messageQueue);
+ if (pullCallback == NULL) {
+ LOG_WARN("Can not get pull callback for:%s, Maybe this pull request has
been released.",
+ request->m_messageQueue.toString().c_str());
+ return;
+ }
+ m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
+ subExpression, // 2
+ pSdata->getSubVersion(), // 3
+ request->getNextOffset(), // 4
+ 32, // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ 1000 * 15, // 8
+ m_asyncPullTimeout, // 9
+ ComMode_ASYNC, // 10
+ pullCallback, // 11
+ getSessionCredentials(), // 12
+ &arg); // 13
} catch (MQException& e) {
LOG_ERROR(e.what());
- producePullMsgTask(request);
+ if (request->isDropped()) {
+ LOG_WARN("Pull request is set as dropped with mq:%s, return",
(request->m_messageQueue).toString().c_str());
+ return;
+ }
+ LOG_INFO("Pull %s occur exception, restart 1s later.",
(request->m_messageQueue).toString().c_str());
+ producePullMsgTaskLater(request, 1000);
}
}
@@ -868,10 +950,10 @@ ConsumerRunningInfo*
DefaultMQPushConsumer::getConsumerRunningInfo() {
getSubscriptions(result);
info->setSubscriptionSet(result);
- std::map<MQMessageQueue, PullRequest*> requestTable =
m_pRebalance->getPullRequestTable();
+ std::map<MQMessageQueue, boost::shared_ptr<PullRequest>> requestTable =
m_pRebalance->getPullRequestTable();
for (const auto& it : requestTable) {
- if (!it.second->isDroped()) {
+ if (!it.second->isDropped()) {
MessageQueue queue((it.first).getTopic(), (it.first).getBrokerName(),
(it.first).getQueueId());
ProcessQueueInfo processQueue;
processQueue.cachedMsgMinOffset = it.second->getCacheMinOffset();
@@ -879,7 +961,7 @@ ConsumerRunningInfo*
DefaultMQPushConsumer::getConsumerRunningInfo() {
processQueue.cachedMsgCount = it.second->getCacheMsgCount();
processQueue.setCommitOffset(
m_pOffsetStore->readOffset(it.first, MEMORY_FIRST_THEN_STORE,
getSessionCredentials()));
- processQueue.setDroped(it.second->isDroped());
+ processQueue.setDroped(it.second->isDropped());
processQueue.setLocked(it.second->isLocked());
processQueue.lastLockTimestamp = it.second->getLastLockTimestamp();
processQueue.lastPullTimestamp = it.second->getLastPullTimestamp();
@@ -892,4 +974,4 @@ ConsumerRunningInfo*
DefaultMQPushConsumer::getConsumerRunningInfo() {
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/FindBrokerResult.h b/src/consumer/FindBrokerResult.h
index 789dc91..76ffc9b 100644
--- a/src/consumer/FindBrokerResult.h
+++ b/src/consumer/FindBrokerResult.h
@@ -27,6 +27,6 @@ struct FindBrokerResult {
std::string brokerAddr;
bool slave;
};
-}
+} // namespace rocketmq
#endif
diff --git a/src/consumer/OffsetStore.cpp b/src/consumer/OffsetStore.cpp
index fc2a5a0..324653d 100644
--- a/src/consumer/OffsetStore.cpp
+++ b/src/consumer/OffsetStore.cpp
@@ -276,7 +276,7 @@ void RemoteBrokerOffsetStore::persist(const MQMessageQueue&
mq, const SessionCre
try {
updateConsumeOffsetToBroker(mq, it->second, session_credentials);
} catch (MQException& e) {
- LOG_ERROR("updateConsumeOffsetToBroker error");
+ LOG_ERROR("updateConsumeOffsetToBroker %s ,offset:[%lld] error",
mq.toString().c_str(), it->second);
}
}
}
@@ -340,4 +340,4 @@ int64
RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MQMessageQueue
}
}
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/OffsetStore.h b/src/consumer/OffsetStore.h
index d10f233..d7ec92b 100644
--- a/src/consumer/OffsetStore.h
+++ b/src/consumer/OffsetStore.h
@@ -101,6 +101,6 @@ class RemoteBrokerOffsetStore : public OffsetStore {
int64 fetchConsumeOffsetFromBroker(const MQMessageQueue& mq, const
SessionCredentials& session_credentials);
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index 46832dd..a7f1156 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -130,4 +130,4 @@ PullResult* PullAPIWrapper::pullKernelImpl(const
MQMessageQueue& mq, // 1
THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
}
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 57c6f0d..29a64fb 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -61,6 +61,6 @@ class PullAPIWrapper {
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif //<! _PULLAPIWRAPPER_H_
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 162be2a..e578840 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -21,14 +21,18 @@ namespace rocketmq {
//<!***************************************************************************
const uint64 PullRequest::RebalanceLockInterval = 20 * 1000;
const uint64 PullRequest::RebalanceLockMaxLiveTime = 30 * 1000;
+/**
+ * If the process queue has not been pulled for more than MAX_PULL_IDLE_TIME,
we need to mark it as dropped
+ * default 120s
+ */
+const uint64 PullRequest::MAX_PULL_IDLE_TIME = 120 * 1000;
PullRequest::PullRequest(const string& groupname)
- : m_groupname(groupname),
- m_nextOffset(0),
- m_queueOffsetMax(0),
- m_bDroped(false),
- m_bLocked(false),
- m_bPullMsgEventInprogress(false) {}
+ : m_groupname(groupname), m_nextOffset(0), m_queueOffsetMax(0),
m_bDropped(false), m_bLocked(false) {
+ m_lastLockTimestamp = UtilAll::currentTimeMillis();
+ m_lastPullTimestamp = UtilAll::currentTimeMillis();
+ m_lastConsumeTimestamp = UtilAll::currentTimeMillis();
+}
PullRequest::~PullRequest() {
m_msgTreeMapTemp.clear();
@@ -40,11 +44,13 @@ PullRequest& PullRequest::operator=(const PullRequest&
other) {
if (this != &other) {
m_groupname = other.m_groupname;
m_nextOffset = other.m_nextOffset;
- m_bDroped.store(other.m_bDroped.load());
+ m_bDropped.store(other.m_bDropped.load());
m_queueOffsetMax = other.m_queueOffsetMax;
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+ m_lastPullTimestamp = other.m_lastPullTimestamp;
+ m_lastConsumeTimestamp = other.m_lastConsumeTimestamp;
}
return *this;
}
@@ -127,7 +133,7 @@ int64 PullRequest::removeMessage(vector<MQMessageExt>&
msgs) {
void PullRequest::clearAllMsgs() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
- if (isDroped()) {
+ if (isDropped()) {
LOG_DEBUG("clear m_msgTreeMap as PullRequest had been dropped.");
m_msgTreeMap.clear();
m_msgTreeMapTemp.clear();
@@ -143,9 +149,9 @@ void PullRequest::updateQueueMaxOffset(int64 queueOffset) {
m_queueOffsetMax = queueOffset;
}
-void PullRequest::setDroped(bool droped) {
- int temp = (droped == true ? 1 : 0);
- m_bDroped.store(temp);
+void PullRequest::setDropped(bool dropped) {
+ int temp = (dropped == true ? 1 : 0);
+ m_bDropped.store(temp);
/*
m_queueOffsetMax = 0;
m_nextOffset = 0;
@@ -160,8 +166,8 @@ void PullRequest::setDroped(bool droped) {
*/
}
-bool PullRequest::isDroped() const {
- return m_bDroped.load() == 1;
+bool PullRequest::isDropped() const {
+ return m_bDropped.load() == 1;
}
int64 PullRequest::getNextOffset() {
@@ -173,6 +179,7 @@ void PullRequest::setLocked(bool Locked) {
int temp = (Locked == true ? 1 : 0);
m_bLocked.store(temp);
}
+
bool PullRequest::isLocked() const {
return m_bLocked.load() == 1;
}
@@ -197,6 +204,16 @@ uint64 PullRequest::getLastPullTimestamp() const {
return m_lastPullTimestamp;
}
+bool PullRequest::isPullRequestExpired() const {
+ uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
+ if (interval <= UtilAll::currentTimeMillis()) {
+ LOG_WARN("PullRequest for [%s] has been expired %lld
ms,m_lastPullTimestamp = %lld ms",
+ m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() -
interval, m_lastPullTimestamp);
+ return true;
+ }
+ return false;
+}
+
void PullRequest::setLastConsumeTimestamp(uint64 time) {
m_lastConsumeTimestamp = time;
}
@@ -257,18 +274,5 @@ int64 PullRequest::commit() {
}
}
-void PullRequest::removePullMsgEvent() {
- m_bPullMsgEventInprogress = false;
-}
-
-bool PullRequest::addPullMsgEvent() {
- if (m_bPullMsgEventInprogress == false) {
- m_bPullMsgEventInprogress = true;
- LOG_INFO("pullRequest with mq :%s set pullMsgEvent",
m_messageQueue.toString().c_str());
- return true;
- }
- return false;
-}
-
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index 83b39b8..c414576 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -41,8 +41,8 @@ class PullRequest {
PullRequest& operator=(const PullRequest& other);
- void setDroped(bool droped);
- bool isDroped() const;
+ void setDropped(bool dropped);
+ bool isDropped() const;
int64 getNextOffset();
void setNextOffset(int64 nextoffset);
@@ -58,6 +58,7 @@ class PullRequest {
int64 getLastLockTimestamp() const;
void setLastPullTimestamp(uint64 time);
uint64 getLastPullTimestamp() const;
+ bool isPullRequestExpired() const;
void setLastConsumeTimestamp(uint64 time);
uint64 getLastConsumeTimestamp() const;
void setTryUnlockTimes(int time);
@@ -66,19 +67,24 @@ class PullRequest {
int64 commit();
void makeMessageToCosumeAgain(vector<MQMessageExt>& msgs);
boost::timed_mutex& getPullRequestCriticalSection();
- void removePullMsgEvent();
+ bool removePullMsgEvent(bool force = false);
bool addPullMsgEvent();
+ /**
+ * Check if there is an in-flight pull request.
+ */
+ bool hasInFlightPullRequest() const;
public:
MQMessageQueue m_messageQueue;
static const uint64 RebalanceLockInterval; // ms
static const uint64 RebalanceLockMaxLiveTime; // ms
+ static const uint64 MAX_PULL_IDLE_TIME; // ms
private:
string m_groupname;
int64 m_nextOffset;
int64 m_queueOffsetMax;
- boost::atomic<bool> m_bDroped;
+ boost::atomic<bool> m_bDropped;
boost::atomic<bool> m_bLocked;
map<int64, MQMessageExt> m_msgTreeMap;
map<int64, MQMessageExt> m_msgTreeMapTemp;
@@ -88,9 +94,8 @@ class PullRequest {
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
boost::timed_mutex m_consumeLock;
- boost::atomic<bool> m_bPullMsgEventInprogress;
};
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/consumer/PullResult.cpp b/src/consumer/PullResult.cpp
index 5fc2f67..64e60cd 100644
--- a/src/consumer/PullResult.cpp
+++ b/src/consumer/PullResult.cpp
@@ -43,4 +43,4 @@ PullResult::~PullResult() {
}
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/PullResultExt.h b/src/consumer/PullResultExt.h
index c5141e2..644d4cd 100644
--- a/src/consumer/PullResultExt.h
+++ b/src/consumer/PullResultExt.h
@@ -59,4 +59,4 @@ class PullResultExt : public PullResult {
MemoryBlock msgMemBlock;
};
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 2f73bd1..0a4c06f 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -32,17 +32,19 @@ Rebalance::Rebalance(MQConsumer* consumer, MQClientFactory*
pfactory)
Rebalance::~Rebalance() {
{
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
- for (; it != m_subscriptionData.end(); ++it)
+ for (; it != m_subscriptionData.end(); ++it) {
deleteAndZero(it->second);
+ }
m_subscriptionData.clear();
}
{
- MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
- for (; it != m_requestQueueTable.end(); ++it) {
- delete it->second;
- it->second = NULL;
- }
- m_requestQueueTable.clear();
+ /*
+ MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
+ for (; it != m_requestQueueTable.end(); ++it) {
+ delete it->second;
+ it->second = NULL;
+ }
+ m_requestQueueTable.clear();*/
}
m_topicSubscribeInfoTable.clear();
m_pConsumer = NULL;
@@ -56,15 +58,21 @@ void Rebalance::doRebalance() {
map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
for (; it != m_subscriptionData.end(); ++it) {
string topic = (it->first);
- LOG_INFO("current topic is:%s", topic.c_str());
+ LOG_DEBUG("current topic is:%s", topic.c_str());
//<!topic -> mqs
vector<MQMessageQueue> mqAll;
if (!getTopicSubscribeInfo(topic, mqAll)) {
continue;
}
if (mqAll.empty()) {
- if (!UtilAll::startsWith_retry(topic))
- THROW_MQEXCEPTION(MQClientException, "doRebalance the topic is
empty", -1);
+ if (!UtilAll::startsWith_retry(topic)) {
+ std::string msg("#doRebalance. mqAll for topic:");
+ msg.append(topic);
+ msg.append(" is empty");
+ LOG_ERROR("Queues to allocate are empty. Msg: %s", msg.c_str());
+ // to check, return error or throw exception
+ THROW_MQEXCEPTION(MQClientException, msg, -1);
+ }
}
//<!msg model;
@@ -82,30 +90,14 @@ void Rebalance::doRebalance() {
m_pConsumer->getSessionCredentials());
if (cidAll.empty()) {
- /*remove the droping pullRequest changes for recovery consume
fastly
- from network broken
- //drop all pullRequest
- MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
- for (; it != m_requestQueueTable.end(); ++it)
- {
- if(!(it->second->isDroped()))
- {
- MQMessageQueue mqtemp = it->first;
- it->second->setDroped(true);
- removeUnnecessaryMessageQueue(mqtemp);
- it->second->clearAllMsgs();//add clear operation to
- avoid bad
- state when dropped pullRequest returns normal
- LOG_INFO("find consumer failed, drop undropped mq:%s",
- mqtemp.toString().c_str());
- }
- }*/
-
+ LOG_ERROR("[ERROR] Get empty consumer IDs. Consumer Group: %s,
Topic: %s",
+ m_pConsumer->getGroupName().c_str(), topic.c_str());
+ // Should skip this round of re-balance immediately if consumer ID
set is empty.
THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is
empty", -1);
}
// log
for (int i = 0; i < (int)cidAll.size(); ++i) {
- LOG_INFO("client id:%s of topic:%s", cidAll[i].c_str(),
topic.c_str());
+ LOG_DEBUG("client id:%s of topic:%s", cidAll[i].c_str(),
topic.c_str());
}
//<! sort;
sort(mqAll.begin(), mqAll.end());
@@ -116,17 +108,34 @@ void Rebalance::doRebalance() {
try {
m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(),
mqAll, cidAll, allocateResult);
} catch (MQException& e) {
- THROW_MQEXCEPTION(MQClientException, "allocate error", -1);
+ std::string errMsg("Allocate message queue for ConsumerGroup[");
+ errMsg.append(m_pConsumer->getGroupName());
+ errMsg.append("],Topic[");
+ errMsg.append(topic);
+ errMsg.append("] failed. ");
+ LOG_ERROR(errMsg.c_str());
+ THROW_MQEXCEPTION(MQClientException, errMsg, -1);
}
// log
for (int i = 0; i < (int)allocateResult.size(); ++i) {
- LOG_INFO("allocate mq:%s", allocateResult[i].toString().c_str());
+ LOG_DEBUG("allocate mq:%s", allocateResult[i].toString().c_str());
}
//<!update local;
bool changed = updateRequestTableInRebalance(topic, allocateResult);
if (changed) {
+ std::stringstream ss;
+ ss << "Allocation result for [Consumer Group: " <<
m_pConsumer->getGroupName() << ", Topic: " << topic
+ << ", Current Consumer ID: " << m_pConsumer->getMQClientId() <<
"] is changed.\n "
+ << "Total Queue #: " << mqAll.size() << ", Total Consumer #: "
<< cidAll.size()
+ << " Allocated Queues are: \n";
+
+ for (vector<MQMessageQueue>::size_type i = 0; i <
allocateResult.size(); ++i) {
+ ss << allocateResult[i].toString() << "\n";
+ }
+ // Log allocation result.
+ LOG_INFO(ss.str().c_str());
messageQueueChanged(topic, mqAll, allocateResult);
break;
}
@@ -148,7 +157,7 @@ void Rebalance::persistConsumerOffset() {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
- if (it->second && (!it->second->isDroped())) {
+ if (it->second && (!it->second->isDropped())) {
mqs.push_back(it->first);
}
}
@@ -173,7 +182,7 @@ void Rebalance::persistConsumerOffsetByResetOffset() {
MQ2PULLREQ::iterator it = m_requestQueueTable.begin();
for (; it != m_requestQueueTable.end(); ++it) {
if (it->second) { // even if it was dropped, also need update offset
when
- // rcv resetOffset cmd
+ // rcv resetOffset cmd
mqs.push_back(it->first);
}
}
@@ -225,29 +234,42 @@ bool Rebalance::getTopicSubscribeInfo(const string&
topic, vector<MQMessageQueue
return false;
}
-void Rebalance::addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest) {
+void Rebalance::addPullRequest(MQMessageQueue mq,
boost::shared_ptr<PullRequest> pPullRequest) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
m_requestQueueTable[mq] = pPullRequest;
}
-PullRequest* Rebalance::getPullRequest(MQMessageQueue mq) {
+void Rebalance::removePullRequest(MQMessageQueue mq) {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
+ m_requestQueueTable.erase(mq);
+ }
+}
+bool Rebalance::isPullRequestExist(MQMessageQueue mq) {
+ boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
+ if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
+ return true;
+ }
+ return false;
+}
+boost::weak_ptr<PullRequest> Rebalance::getPullRequest(MQMessageQueue mq) {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
if (m_requestQueueTable.find(mq) != m_requestQueueTable.end()) {
return m_requestQueueTable[mq];
}
- return NULL;
+ return boost::weak_ptr<PullRequest>();
}
-map<MQMessageQueue, PullRequest*> Rebalance::getPullRequestTable() {
+map<MQMessageQueue, boost::shared_ptr<PullRequest>>
Rebalance::getPullRequestTable() {
boost::lock_guard<boost::mutex> lock(m_requestTableMutex);
return m_requestQueueTable;
}
-void Rebalance::unlockAll(bool oneway) {
+void Rebalance::unlockAll(bool oneWay) {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it !=
requestQueueTable.end(); ++it) {
- if (!(it->second->isDroped())) {
+ if (!(it->second->isDropped())) {
if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[it->first.getBrokerName()] = mqs;
@@ -274,10 +296,10 @@ void Rebalance::unlockAll(bool oneway) {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr,
unlockBatchRequest.get(),
1000,
m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
- PullRequest* pullreq = getPullRequest(mqs[i]);
- if (pullreq) {
+ boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
+ if (!pullreq.expired()) {
LOG_INFO("unlockBatchMQ success of mq:%s",
mqs[i].toString().c_str());
- pullreq->setLocked(false);
+ pullreq.lock()->setLocked(false);
} else {
LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
}
@@ -308,10 +330,10 @@ void Rebalance::unlock(MQMessageQueue mq) {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr,
unlockBatchRequest.get(), 1000,
m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
- PullRequest* pullreq = getPullRequest(mqs[i]);
- if (pullreq) {
+ boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
+ if (!pullreq.expired()) {
LOG_INFO("unlock success of mq:%s", mqs[i].toString().c_str());
- pullreq->setLocked(false);
+ pullreq.lock()->setLocked(false);
} else {
LOG_ERROR("unlock fails of mq:%s", mqs[i].toString().c_str());
}
@@ -325,7 +347,7 @@ void Rebalance::lockAll() {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it !=
requestQueueTable.end(); ++it) {
- if (!(it->second->isDroped())) {
+ if (!(it->second->isDropped())) {
string brokerKey = it->first.getBrokerName() + it->first.getTopic();
if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
@@ -355,11 +377,11 @@ void Rebalance::lockAll() {
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr,
lockBatchRequest.get(),
messageQueues, 1000,
m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
- PullRequest* pullreq = getPullRequest(messageQueues[i]);
- if (pullreq) {
+ boost::weak_ptr<PullRequest> pullreq =
getPullRequest(messageQueues[i]);
+ if (!pullreq.expired()) {
LOG_INFO("lockBatchMQ success of mq:%s",
messageQueues[i].toString().c_str());
- pullreq->setLocked(true);
- pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+ pullreq.lock()->setLocked(true);
+ pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
} else {
LOG_ERROR("lockBatchMQ fails of mq:%s",
messageQueues[i].toString().c_str());
}
@@ -372,6 +394,7 @@ void Rebalance::lockAll() {
}
brokerMqs.clear();
}
+
bool Rebalance::lock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
MASTER_ID, true));
@@ -397,11 +420,11 @@ bool Rebalance::lock(MQMessageQueue mq) {
return false;
}
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
- PullRequest* pullreq = getPullRequest(messageQueues[i]);
- if (pullreq) {
+ boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
+ if (!pullreq.expired()) {
LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
- pullreq->setLocked(true);
- pullreq->setLastLockTimestamp(UtilAll::currentTimeMillis());
+ pullreq.lock()->setLocked(true);
+ pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
lockResult = true;
} else {
LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
@@ -436,113 +459,105 @@ void RebalancePull::removeUnnecessaryMessageQueue(const
MQMessageQueue& mq) {}
RebalancePush::RebalancePush(MQConsumer* consumer, MQClientFactory* pfactory)
: Rebalance(consumer, pfactory) {}
bool RebalancePush::updateRequestTableInRebalance(const string& topic,
vector<MQMessageQueue>& mqsSelf) {
- LOG_DEBUG("updateRequestTableInRebalance Enter");
+ LOG_DEBUG("updateRequestTableInRebalance for Topic[%s] Enter",
topic.c_str());
+
+ // 1. Clear no in charge of
+ // 1. set dropped
+ // 2. clear local message
+ // 3. clear offset
+ // 4. remove request table
+ // 5. set flag for route changed
+ // 2. Check and clear dropped/invalid pullrequest(timeout and so on)
+ // 3. Add new mq in charge of
+ // 1. new pullrequest
+ // 2. init next pull offset
+ // 3. int offset
+ // 4. add request table
+ // 5. set flag for route changed
+ // 4. Start long pull for request
if (mqsSelf.empty()) {
LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
}
bool changed = false;
- //<!remove
+ //<!remove none responsive mq
MQ2PULLREQ requestQueueTable(getPullRequestTable());
- MQ2PULLREQ::iterator it = requestQueueTable.begin();
- for (; it != requestQueueTable.end(); ++it) {
- MQMessageQueue mqtemp = it->first;
+ MQ2PULLREQ::iterator itDel = requestQueueTable.begin();
+ for (; itDel != requestQueueTable.end(); ++itDel) {
+ MQMessageQueue mqtemp = itDel->first;
if (mqtemp.getTopic().compare(topic) == 0) {
- if (mqsSelf.empty() || (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) ==
mqsSelf.end())) {
- if (!(it->second->isDroped())) {
- it->second->setDroped(true);
- // delete the lastest pull request for this mq, which hasn't been
response
- // m_pClientFactory->removeDropedPullRequestOpaque(it->second);
- removeUnnecessaryMessageQueue(mqtemp);
- it->second->clearAllMsgs(); // add clear operation to avoid bad
state
- // when dropped pullRequest returns
- // normal
- LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
- }
+ if (mqsSelf.empty() || (std::find(mqsSelf.begin(), mqsSelf.end(),
mqtemp) == mqsSelf.end())) {
+ // if not response , set to dropped
+ LOG_INFO("Drop mq:%s,because not responsive",
mqtemp.toString().c_str());
+ itDel->second->setDropped(true);
+ // remove offset table to avoid offset backup
+ removeUnnecessaryMessageQueue(mqtemp);
+ itDel->second->clearAllMsgs();
+ removePullRequest(mqtemp);
+ changed = true;
+ } else if (itDel->second->isPullRequestExpired()) {
+ // if pull expired , set to dropped, eg: if add pull task error, the
pull request will be expired.
+ LOG_INFO("Drop mq:%s according Pull timeout.",
mqtemp.toString().c_str());
+ itDel->second->setDropped(true);
+ removeUnnecessaryMessageQueue(mqtemp);
+ itDel->second->clearAllMsgs();
+ removePullRequest(mqtemp);
changed = true;
}
}
}
- //<!add
- vector<PullRequest*> pullrequestAdd;
- DefaultMQPushConsumer* pConsumer =
static_cast<DefaultMQPushConsumer*>(m_pConsumer);
- vector<MQMessageQueue>::iterator it2 = mqsSelf.begin();
- for (; it2 != mqsSelf.end(); ++it2) {
- PullRequest* pPullRequest(getPullRequest(*it2));
- if (pPullRequest && pPullRequest->isDroped()) {
- LOG_DEBUG(
- "before resume the pull handle of this pullRequest, its mq is:%s, "
- "its offset is:%lld",
- (it2->toString()).c_str(), pPullRequest->getNextOffset());
- pConsumer->getOffsetStore()->removeOffset(*it2); // remove dirty offset
which maybe update to
- //
OffsetStore::m_offsetTable by consuming After last
- // drop
- int64 nextOffset = computePullFromWhere(*it2);
- if (nextOffset >= 0) {
- /*
- Fix issue with following scenario:
- 1. pullRequest was dropped
- 2. the pullMsgEvent was not executed by taskQueue, so the
PullMsgEvent
- was not stop
- 3. pullReuest was resumed by next doRebalance, then mulitple
- pullMsgEvent were produced for pullRequest
- */
- bool bPullMsgEvent = pPullRequest->addPullMsgEvent();
- while (!bPullMsgEvent) {
- boost::this_thread::sleep_for(boost::chrono::milliseconds(50));
- LOG_INFO("pullRequest with mq :%s has unfinished pullMsgEvent",
(it2->toString()).c_str());
- bPullMsgEvent = pPullRequest->addPullMsgEvent();
- }
- pPullRequest->setDroped(false);
- pPullRequest->clearAllMsgs(); // avoid consume accumulation and
consume
- // dumplication issues
- pPullRequest->setNextOffset(nextOffset);
- pPullRequest->updateQueueMaxOffset(nextOffset);
- LOG_INFO(
- "after resume the pull handle of this pullRequest, its mq is:%s, "
- "its offset is:%lld",
- (it2->toString()).c_str(), pPullRequest->getNextOffset());
- changed = true;
- pConsumer->producePullMsgTask(pPullRequest);
- } else {
- LOG_ERROR("get fatel error QueryOffset of mq:%s, do not reconsume this
queue", (it2->toString()).c_str());
- }
+ //<!add check new mq added.
+ vector<boost::shared_ptr<PullRequest>> pullRequestsToAdd;
+ vector<MQMessageQueue>::iterator itAdd = mqsSelf.begin();
+ for (; itAdd != mqsSelf.end(); ++itAdd) {
+ if (isPullRequestExist(*itAdd)) {
+ // have check the expired pull request, re-add it.
+ continue;
}
-
- if (!pPullRequest) {
- LOG_INFO("updateRequestTableInRebalance Doesn't find old mq");
- PullRequest* pullRequest = new PullRequest(m_pConsumer->getGroupName());
- pullRequest->m_messageQueue = *it2;
-
- int64 nextOffset = computePullFromWhere(*it2);
- if (nextOffset >= 0) {
- pullRequest->setNextOffset(nextOffset);
- pullRequest->clearAllMsgs(); // avoid consume accumulation and consume
- // dumplication issues
- changed = true;
- //<! mq-> pq;
- addPullRequest(*it2, pullRequest);
- pullrequestAdd.push_back(pullRequest);
- LOG_INFO("add mq:%s, request initiall offset:%lld",
(*it2).toString().c_str(), nextOffset);
- }
+ boost::shared_ptr<PullRequest> pullRequest =
boost::make_shared<PullRequest>(m_pConsumer->getGroupName());
+ pullRequest->m_messageQueue = *itAdd;
+ int64 nextOffset = computePullFromWhere(*itAdd);
+ if (nextOffset >= 0) {
+ pullRequest->setNextOffset(nextOffset);
+ changed = true;
+ addPullRequest(*itAdd, pullRequest);
+ pullRequestsToAdd.push_back(pullRequest);
+ LOG_INFO("Add mq:%s, request initial offset:%ld",
(*itAdd).toString().c_str(), nextOffset);
+ } else {
+ LOG_WARN(
+ "Failed to add pull request for %s due to failure of querying
consume offset, request initial offset:%ld",
+ (*itAdd).toString().c_str(), nextOffset);
}
}
- vector<PullRequest*>::iterator it3 = pullrequestAdd.begin();
- for (; it3 != pullrequestAdd.end(); ++it3) {
- LOG_DEBUG("start pull request");
- pConsumer->producePullMsgTask(*it3);
+ for (vector<boost::shared_ptr<PullRequest>>::iterator itAdded =
pullRequestsToAdd.begin();
+ itAdded != pullRequestsToAdd.end(); ++itAdded) {
+ LOG_INFO("Start to pull %s, offset:%ld, GroupName %s",
(*itAdded)->m_messageQueue.toString().c_str(),
+ (*itAdded)->getNextOffset(), (*itAdded)->getGroupName().c_str());
+ if (!m_pConsumer->producePullMsgTask(*itAdded)) {
+ LOG_WARN(
+ "Failed to producer pull message task for %s, Remove it from Request
table and wait for next #Rebalance.",
+ (*itAdded)->m_messageQueue.toString().c_str());
+ // remove from request table, and wait for next rebalance.
+ (*itAdded)->setDropped(true);
+ removePullRequest((*itAdded)->m_messageQueue);
+ }
}
- LOG_DEBUG("updateRequestTableInRebalance exit");
+ LOG_DEBUG("updateRequestTableInRebalance Topic[%s] exit", topic.c_str());
return changed;
}
int64 RebalancePush::computePullFromWhere(const MQMessageQueue& mq) {
int64 result = -1;
- DefaultMQPushConsumer* pConsumer =
static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ DefaultMQPushConsumer* pConsumer =
dynamic_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ if (!pConsumer) {
+ LOG_ERROR("Cast consumer pointer to DefaultMQPushConsumer pointer failed
when computePullFromWhere %s",
+ mq.toString().c_str());
+ return result;
+ }
ConsumeFromWhere consumeFromWhere = pConsumer->getConsumeFromWhere();
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
switch (consumeFromWhere) {
@@ -623,7 +638,12 @@ void RebalancePush::messageQueueChanged(const string&
topic,
vector<MQMessageQueue>& mqDivided) {}
void RebalancePush::removeUnnecessaryMessageQueue(const MQMessageQueue& mq) {
- DefaultMQPushConsumer* pConsumer =
static_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ // DefaultMQPushConsumer *pConsumer = static_cast<DefaultMQPushConsumer
*>(m_pConsumer);
+ DefaultMQPushConsumer* pConsumer =
dynamic_cast<DefaultMQPushConsumer*>(m_pConsumer);
+ if (!pConsumer) {
+ LOG_ERROR("Cast MQConsumer* to DefaultMQPushConsumer* failed when remove
%s", mq.toString().c_str());
+ return;
+ }
OffsetStore* pOffsetStore = pConsumer->getOffsetStore();
pOffsetStore->persist(mq, m_pConsumer->getSessionCredentials());
diff --git a/src/consumer/Rebalance.h b/src/consumer/Rebalance.h
index 8d8a9ae..fe39569 100644
--- a/src/consumer/Rebalance.h
+++ b/src/consumer/Rebalance.h
@@ -24,14 +24,17 @@
#include "PullRequest.h"
#include "SubscriptionData.h"
+#include <boost/smart_ptr.hpp>
#include <boost/thread/mutex.hpp>
namespace rocketmq {
class MQClientFactory;
+
//<!************************************************************************
class Rebalance {
public:
Rebalance(MQConsumer*, MQClientFactory*);
+
virtual ~Rebalance();
virtual void messageQueueChanged(const string& topic,
@@ -46,24 +49,36 @@ class Rebalance {
public:
void doRebalance();
+
void persistConsumerOffset();
+
void persistConsumerOffsetByResetOffset();
+
//<!m_subscriptionInner;
SubscriptionData* getSubscriptionData(const string& topic);
+
void setSubscriptionData(const string& topic, SubscriptionData* pdata);
map<string, SubscriptionData*>& getSubscriptionInner();
//<!m_topicSubscribeInfoTable;
void setTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
+
bool getTopicSubscribeInfo(const string& topic, vector<MQMessageQueue>& mqs);
- void addPullRequest(MQMessageQueue mq, PullRequest* pPullRequest);
- PullRequest* getPullRequest(MQMessageQueue mq);
- map<MQMessageQueue, PullRequest*> getPullRequestTable();
+ void addPullRequest(MQMessageQueue mq, boost::shared_ptr<PullRequest>
pPullRequest);
+ void removePullRequest(MQMessageQueue mq);
+ bool isPullRequestExist(MQMessageQueue mq);
+ boost::weak_ptr<PullRequest> getPullRequest(MQMessageQueue mq);
+
+ map<MQMessageQueue, boost::shared_ptr<PullRequest>> getPullRequestTable();
+
void lockAll();
+
bool lock(MQMessageQueue mq);
- void unlockAll(bool oneway = false);
+
+ void unlockAll(bool oneWay = false);
+
void unlock(MQMessageQueue mq);
protected:
@@ -71,7 +86,7 @@ class Rebalance {
boost::mutex m_topicSubscribeInfoTableMutex;
map<string, vector<MQMessageQueue>> m_topicSubscribeInfoTable;
- typedef map<MQMessageQueue, PullRequest*> MQ2PULLREQ;
+ typedef map<MQMessageQueue, boost::shared_ptr<PullRequest>> MQ2PULLREQ;
MQ2PULLREQ m_requestQueueTable;
boost::mutex m_requestTableMutex;
@@ -84,6 +99,7 @@ class Rebalance {
class RebalancePull : public Rebalance {
public:
RebalancePull(MQConsumer*, MQClientFactory*);
+
virtual ~RebalancePull(){};
virtual void messageQueueChanged(const string& topic,
@@ -101,6 +117,7 @@ class RebalancePull : public Rebalance {
class RebalancePush : public Rebalance {
public:
RebalancePush(MQConsumer*, MQClientFactory*);
+
virtual ~RebalancePush(){};
virtual void messageQueueChanged(const string& topic,
@@ -115,6 +132,6 @@ class RebalancePush : public Rebalance {
};
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif
diff --git a/src/consumer/SubscriptionData.cpp
b/src/consumer/SubscriptionData.cpp
index e771b52..433fce1 100644
--- a/src/consumer/SubscriptionData.cpp
+++ b/src/consumer/SubscriptionData.cpp
@@ -18,8 +18,8 @@
#include <algorithm>
#include <sstream>
#include <vector>
-#include "UtilAll.h"
#include "Logging.h"
+#include "UtilAll.h"
namespace rocketmq {
//<!************************************************************************
SubscriptionData::SubscriptionData() {
@@ -127,4 +127,4 @@ Json::Value SubscriptionData::toJson() const {
}
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/consumer/SubscriptionData.h b/src/consumer/SubscriptionData.h
index 89be74f..40e985b 100644
--- a/src/consumer/SubscriptionData.h
+++ b/src/consumer/SubscriptionData.h
@@ -57,6 +57,6 @@ class SubscriptionData {
vector<int> m_codeSet;
};
//<!***************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
#endif