This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push: new fd29759 feat(consumer): remove event if consumer service shutdown (#233) fd29759 is described below commit fd29759a0280c4e2ffc63c1ac4578700dd68018e Author: dinglei <libya_...@163.com> AuthorDate: Thu Jan 16 15:19:36 2020 +0800 feat(consumer): remove event if consumer service shutdown (#233) --- src/consumer/ConsumeMessageConcurrentlyService.cpp | 10 ++++++++-- src/consumer/DefaultMQPushConsumer.cpp | 16 +++++++++++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp index 9c3a05b..deda8ac 100644 --- a/src/consumer/ConsumeMessageConcurrentlyService.cpp +++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp @@ -73,8 +73,11 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul request->m_messageQueue.toString().c_str()); return; } - if (!request->isDropped()) { + if (!request->isDropped() && !m_ioService.stopped()) { m_ioService.post(boost::bind(&ConsumeMessageConcurrentlyService::ConsumeRequest, this, request, msgs)); + } else { + LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post ConsumeRequest.", + request->m_messageQueue.toString().c_str()); } } void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest> pullRequest, @@ -93,13 +96,16 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_pt (request->m_messageQueue).toString().c_str()); return; } - if (!request->isDropped()) { + if (!request->isDropped() && !m_ioService.stopped()) { boost::asio::deadline_timer* t = new boost::asio::deadline_timer(m_ioService, boost::posix_time::milliseconds(millis)); t->async_wait( boost::bind(&(ConsumeMessageConcurrentlyService::static_submitConsumeRequest), this, t, request, msgs)); LOG_INFO("Submit Message to Consumer [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), millis); + } else { + LOG_INFO("IOService stopped or Pull request for %s is dropped, will not post delay ConsumeRequest.", + request->m_messageQueue.toString().c_str()); } } diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp index 8e2541e..9da0ca8 100644 --- a/src/consumer/DefaultMQPushConsumer.cpp +++ b/src/consumer/DefaultMQPushConsumer.cpp @@ -575,11 +575,17 @@ bool DefaultMQPushConsumer::producePullMsgTaskLater(boost::weak_ptr<PullRequest> LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", request->m_messageQueue.toString().c_str()); return false; } - boost::asio::deadline_timer* t = - new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis)); - t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest), this, t, request)); - LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), millis); - return true; + if (m_pullmsgQueue->bTaskQueueStatusOK() && isServiceStateOk()) { + boost::asio::deadline_timer* t = + new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(millis)); + t->async_wait(boost::bind(&(DefaultMQPushConsumer::static_triggerNextPullRequest), this, t, request)); + LOG_INFO("Produce Pull request [%s] Later and Sleep [%d]ms.", (request->m_messageQueue).toString().c_str(), millis); + return true; + } else { + LOG_WARN("Service or TaskQueue shutdown, produce PullRequest of mq:%s failed", + request->m_messageQueue.toString().c_str()); + return false; + } } bool DefaultMQPushConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pullRequest) {