http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp new file mode 100755 index 0000000..8d7f8a1 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -0,0 +1,1018 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "DefaultMQPushConsumerImpl.h" + +#include <string> +#include <set> +#include "DefaultMQPushConsumer.h" +#include "ConsumerStatManage.h" +#include "DefaultMQPullConsumer.h" +#include "DefaultMQProducer.h" +#include "MQClientFactory.h" +#include "MQAdminImpl.h" +#include "RebalancePushImpl.h" +#include "MQClientAPIImpl.h" +#include "OffsetStore.h" +#include "MixAll.h" +#include "MQClientManager.h" +#include "LocalFileOffsetStore.h" +#include "RemoteBrokerOffsetStore.h" +#include "PullSysFlag.h" +#include "FilterAPI.h" +#include "PullAPIWrapper.h" +#include "MQClientException.h" +#include "Validators.h" +#include "MessageListener.h" +#include "ConsumeMessageHook.h" +#include "PullMessageService.h" +#include "ConsumeMessageOrderlyService.h" +#include "ConsumeMessageConcurrentlyService.h" +#include "KPRUtil.h" +#include "TimerThread.h" + +namespace rmq +{ + +/* RemoveProcessQueueLater */ +class RemoveProcessQueueLater : public kpr::TimerHandler +{ +public: + RemoveProcessQueueLater(DefaultMQPushConsumerImpl* pConsumerImp, PullRequest* pPullRequest) + : m_pConsumerImp(pConsumerImp), m_pPullRequest(pPullRequest) + { + } + + void OnTimeOut(unsigned int timerID) + { + try + { + m_pConsumerImp->getOffsetStore()->updateOffset(m_pPullRequest->getMessageQueue(), m_pPullRequest->getNextOffset(), false); + m_pConsumerImp->getOffsetStore()->persist(m_pPullRequest->getMessageQueue()); + m_pConsumerImp->getRebalanceImpl()->removeProcessQueue(m_pPullRequest->getMessageQueue()); + + RMQ_WARN("fix the pull request offset, {%s}", m_pPullRequest->toString().c_str()); + } + catch(...) + { + RMQ_ERROR("RemoveProcessQueueLater OnTimeOut Exception"); + } + + delete this; + } + +private: + DefaultMQPushConsumerImpl* m_pConsumerImp; + PullRequest* m_pPullRequest; +}; + + +/* DefaultMQPushConsumerImplCallback */ +class DefaultMQPushConsumerImplCallback : public PullCallback +{ +public: + DefaultMQPushConsumerImplCallback(SubscriptionData& subscriptionData, + DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl, + PullRequest* pPullRequest) + : m_subscriptionData(subscriptionData), + m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl), + m_pPullRequest(pPullRequest) + { + m_beginTimestamp = KPRUtil::GetCurrentTimeMillis(); + } + + void onSuccess(PullResult& pullResult) + { + RMQ_DEBUG("onSuccess begin: %s", pullResult.toString().c_str()); + PullResult* pPullResult = &pullResult; + if (pPullResult != NULL) + { + pPullResult = + m_pDefaultMQPushConsumerImpl->m_pPullAPIWrapper->processPullResult( + m_pPullRequest->getMessageQueue(), *pPullResult, m_subscriptionData); + + switch (pPullResult->pullStatus) + { + case FOUND: + { + m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); + + long long pullRT = KPRUtil::GetCurrentTimeMillis() - m_beginTimestamp; + m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat() + .pullTimesTotal++; + m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat() + .pullRTTotal.fetchAndAdd(pullRT); + + ProcessQueue* processQueue = m_pPullRequest->getProcessQueue(); + bool dispatchToConsume = processQueue->putMessage(pPullResult->msgFoundList); + + m_pDefaultMQPushConsumerImpl->m_pConsumeMessageService->submitConsumeRequest(// + pPullResult->msgFoundList, // + processQueue, // + m_pPullRequest->getMessageQueue(), // + dispatchToConsume); + + if (m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval() > 0) + { + m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest, + m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval()); + } + else + { + m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); + } + } + break; + case NO_NEW_MSG: + m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); + m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest); + m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); + break; + case NO_MATCHED_MSG: + m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); + m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest); + m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); + break; + case OFFSET_ILLEGAL: + RMQ_WARN("the pull request offset illegal, %s, %s", + m_pPullRequest->toString().c_str(), pPullResult->toString().c_str()); + + /* + if (m_pPullRequest->getNextOffset() < pPullResult->minOffset) + { + m_pPullRequest->setNextOffset(pPullResult->minOffset); + } + else if (m_pPullRequest->getNextOffset() > pPullResult->maxOffset) + { + m_pPullRequest->setNextOffset(pPullResult->maxOffset); + } + m_pDefaultMQPushConsumerImpl->m_pOffsetStore->updateOffset( + m_pPullRequest->getMessageQueue(), m_pPullRequest->getNextOffset(), false); + m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest); + */ + + // todo + m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset); + m_pPullRequest->getProcessQueue()->setDropped(true); + + m_pDefaultMQPushConsumerImpl->executeTaskLater(new RemoveProcessQueueLater( + m_pDefaultMQPushConsumerImpl, m_pPullRequest), 10000); + break; + default: + break; + } + } + else + { + RMQ_WARN("Warning: PullRequest is null!"); + } + RMQ_DEBUG("onSuccess end"); + } + + void onException(MQException& e) + { + std::string topic = m_pPullRequest->getMessageQueue().getTopic(); + if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != std::string::npos) + { + RMQ_WARN("execute the pull request exception:%s", e.what()); + } + + m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest, + DefaultMQPushConsumerImpl::s_PullTimeDelayMillsWhenException); + } + +private: + SubscriptionData m_subscriptionData; + DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl; + PullRequest* m_pPullRequest; + unsigned long long m_beginTimestamp; +}; + + +DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(DefaultMQPushConsumer* pDefaultMQPushConsumer) +{ + m_pDefaultMQPushConsumer = pDefaultMQPushConsumer; + m_serviceState = CREATE_JUST; + flowControlTimes1 = 0; + flowControlTimes2 = 0; + m_pause = false; + m_consumeOrderly = false; + + m_pMQClientFactory = NULL; + m_pPullAPIWrapper = NULL; + m_pMessageListenerInner = NULL; + m_pOffsetStore = NULL; + m_pRebalanceImpl = new RebalancePushImpl(this); + m_pConsumerStatManager = new ConsumerStatManager(); + m_pConsumeMessageService = NULL; +} + +DefaultMQPushConsumerImpl::~DefaultMQPushConsumerImpl() +{ + //delete m_pMessageListenerInner; + if (m_pPullAPIWrapper) + delete m_pPullAPIWrapper; + if (m_pRebalanceImpl) + delete m_pRebalanceImpl; + if (m_pConsumerStatManager) + delete m_pConsumerStatManager; + if (m_pConsumeMessageService) + delete m_pConsumeMessageService; + if (m_pOffsetStore) + delete m_pOffsetStore; + //delete m_pMQClientFactory; +} + +void DefaultMQPushConsumerImpl::start() +{ + RMQ_DEBUG("DefaultMQPushConsumerImpl::start()"); + switch (m_serviceState) + { + case CREATE_JUST: + { + RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}", + m_pDefaultMQPushConsumer->getConsumerGroup().c_str(), + getMessageModelString(m_pDefaultMQPushConsumer->getMessageModel())); + + m_serviceState = START_FAILED; + checkConfig(); + copySubscription(); + + if (m_pDefaultMQPushConsumer->getMessageModel() == CLUSTERING) + { + m_pDefaultMQPushConsumer->changeInstanceNameToPID(); + } + + m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPushConsumer); + + m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPushConsumer->getConsumerGroup()); + m_pRebalanceImpl->setMessageModel(m_pDefaultMQPushConsumer->getMessageModel()); + m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy()); + m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory); + + m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup()); + + if (m_pDefaultMQPushConsumer->getOffsetStore() != NULL) + { + m_pOffsetStore = m_pDefaultMQPushConsumer->getOffsetStore(); + } + else + { + switch (m_pDefaultMQPushConsumer->getMessageModel()) + { + case BROADCASTING: + m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup()); + break; + case CLUSTERING: + m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup()); + break; + default: + break; + } + } + + m_pOffsetStore->load(); + + if (dynamic_cast<MessageListenerOrderly*>(m_pMessageListenerInner) != NULL) + { + m_consumeOrderly = true; + m_pConsumeMessageService = + new ConsumeMessageOrderlyService(this, (MessageListenerOrderly*)m_pMessageListenerInner); + } + else if (dynamic_cast<MessageListenerConcurrently*>(m_pMessageListenerInner) != NULL) + { + m_consumeOrderly = false; + m_pConsumeMessageService = + new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently*)m_pMessageListenerInner); + } + m_pConsumeMessageService->start(); + + bool registerOK = m_pMQClientFactory->registerConsumer(m_pDefaultMQPushConsumer->getConsumerGroup(), this); + if (!registerOK) + { + m_serviceState = CREATE_JUST; + m_pConsumeMessageService->shutdown(); + std::string str = "The consumer group[" + m_pDefaultMQPushConsumer->getConsumerGroup(); + str += "] has been created before, specify another name please."; + THROW_MQEXCEPTION(MQClientException, str, -1); + } + m_pMQClientFactory->start(); + + RMQ_INFO("the consumer [%s] start OK.", m_pDefaultMQPushConsumer->getConsumerGroup().c_str()); + m_serviceState = RUNNING; + } + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1); + default: + break; + } + + updateTopicSubscribeInfoWhenSubscriptionChanged(); + m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock(); + m_pMQClientFactory->rebalanceImmediately(); +} + + +void DefaultMQPushConsumerImpl::shutdown() +{ + RMQ_DEBUG("DefaultMQPushConsumerImpl::shutdown()"); + switch (m_serviceState) + { + case CREATE_JUST: + break; + case RUNNING: + m_pConsumeMessageService->shutdown(); + persistConsumerOffset(); + m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPushConsumer->getConsumerGroup()); + m_pMQClientFactory->shutdown(); + + m_serviceState = SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } +} + + + + +bool DefaultMQPushConsumerImpl::hasHook() +{ + return !m_hookList.empty(); +} + +void DefaultMQPushConsumerImpl::registerHook(ConsumeMessageHook* pHook) +{ + m_hookList.push_back(pHook); +} + +void DefaultMQPushConsumerImpl::executeHookBefore(ConsumeMessageContext& context) +{ + std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin(); + for (; it != m_hookList.end(); it++) + { + try + { + (*it)->consumeMessageBefore(context); + } + catch (...) + { + RMQ_WARN("consumeMessageBefore exception"); + } + } +} + +void DefaultMQPushConsumerImpl::executeHookAfter(ConsumeMessageContext& context) +{ + std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin(); + for (; it != m_hookList.end(); it++) + { + try + { + (*it)->consumeMessageAfter(context); + } + catch (...) + { + RMQ_WARN("consumeMessageAfter exception"); + } + } +} + +void DefaultMQPushConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum) +{ + m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum); +} + +std::set<MessageQueue>* DefaultMQPushConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic) +{ + std::map<std::string, std::set<MessageQueue> >& mqs = m_pRebalanceImpl->getTopicSubscribeInfoTable(); + std::map<std::string, std::set<MessageQueue> >::iterator it = mqs.find(topic); + + if (it == mqs.end()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic); + mqs = m_pRebalanceImpl->getTopicSubscribeInfoTable(); + it = mqs.find(topic); + } + + if (it == mqs.end()) + { + THROW_MQEXCEPTION(MQClientException, "The topic[" + topic + "] not exist", -1); + } + + std::set<MessageQueue>* result = new std::set<MessageQueue>(it->second.begin(), it->second.end()); + return result; +} + +DefaultMQPushConsumer* DefaultMQPushConsumerImpl::getDefaultMQPushConsumer() +{ + return m_pDefaultMQPushConsumer; +} + +long long DefaultMQPushConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq) +{ + return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq); +} + +long long DefaultMQPushConsumerImpl::maxOffset(const MessageQueue& mq) +{ + return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq); +} + +long long DefaultMQPushConsumerImpl::minOffset(const MessageQueue& mq) +{ + return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq); +} + +OffsetStore* DefaultMQPushConsumerImpl::getOffsetStore() +{ + return m_pOffsetStore; +} + +void DefaultMQPushConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore) +{ + m_pOffsetStore = pOffsetStore; +} + +//MQConsumerInner +std::string DefaultMQPushConsumerImpl::groupName() +{ + return m_pDefaultMQPushConsumer->getConsumerGroup(); +} + +MessageModel DefaultMQPushConsumerImpl::messageModel() +{ + return m_pDefaultMQPushConsumer->getMessageModel(); +} + +ConsumeType DefaultMQPushConsumerImpl::consumeType() +{ + return CONSUME_PASSIVELY; +} + +ConsumeFromWhere DefaultMQPushConsumerImpl::consumeFromWhere() +{ + return m_pDefaultMQPushConsumer->getConsumeFromWhere(); +} + +std::set<SubscriptionData> DefaultMQPushConsumerImpl::subscriptions() +{ + std::set<SubscriptionData> sds; + std::map<std::string, SubscriptionData>& subscription = m_pRebalanceImpl->getSubscriptionInner(); + std::map<std::string, SubscriptionData>::iterator it = subscription.begin(); + for (; it != subscription.end(); it++) + { + sds.insert(it->second); + } + + return sds; +} + +void DefaultMQPushConsumerImpl::doRebalance() +{ + if (m_pRebalanceImpl != NULL) + { + m_pRebalanceImpl->doRebalance(); + } +} + +void DefaultMQPushConsumerImpl::persistConsumerOffset() +{ + try + { + makeSureStateOK(); + + std::set<MessageQueue> mqs; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock()); + std::map<MessageQueue, ProcessQueue*>& processQueueTable = m_pRebalanceImpl->getProcessQueueTable(); + RMQ_FOR_EACH(processQueueTable, it) + { + mqs.insert(it->first); + } + } + + m_pOffsetStore->persistAll(mqs); + } + catch (...) + { + RMQ_ERROR("persistConsumerOffset exception, group: %s", + m_pDefaultMQPushConsumer->getConsumerGroup().c_str()); + } +} + +void DefaultMQPushConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info) +{ + std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner(); + + if (subTable.find(topic) != subTable.end()) + { + m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info)); + } +} + +std::map<std::string, SubscriptionData>& DefaultMQPushConsumerImpl::getSubscriptionInner() +{ + return m_pRebalanceImpl->getSubscriptionInner(); +} + +bool DefaultMQPushConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic) +{ + std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner(); + + if (subTable.find(topic) != subTable.end()) + { + std::map<std::string, std::set<MessageQueue> >& mqs = + m_pRebalanceImpl->getTopicSubscribeInfoTable(); + + return mqs.find(topic) == mqs.end(); + } + + return false; +} + +bool DefaultMQPushConsumerImpl::isPause() +{ + return m_pause; +} + +void DefaultMQPushConsumerImpl::setPause(bool pause) +{ + m_pause = pause; +} + + +void DefaultMQPushConsumerImpl::correctTagsOffset(PullRequest& pullRequest) +{ + if (pullRequest.getProcessQueue()->getMsgCount().get() == 0) + { + m_pOffsetStore->updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true); + } +} + +void DefaultMQPushConsumerImpl::pullMessage(PullRequest* pPullRequest) +{ + RMQ_DEBUG("pullMessage begin: %s", pPullRequest->toString().c_str()); + + ProcessQueue* processQueue = pPullRequest->getProcessQueue(); + if (processQueue->isDropped()) + { + RMQ_WARN("the pull request[%s] is dropped.", pPullRequest->toString().c_str()); + delete pPullRequest; + return; + } + + pPullRequest->getProcessQueue()->setLastPullTimestamp(KPRUtil::GetCurrentTimeMillis()); + + try + { + makeSureStateOK(); + } + catch (const MQException& e) + { + RMQ_WARN("pullMessage exception [%s], consumer state not ok", e.what()); + executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException); + return; + } + + if (isPause()) + { + RMQ_WARN("consumer was paused, execute pull request later. instanceName={%s}", + m_pDefaultMQPushConsumer->getInstanceName().c_str()); + executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenSuspend); + return; + } + + long size = processQueue->getMsgCount().get(); + if (size > m_pDefaultMQPushConsumer->getPullThresholdForQueue()) + { + executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenFlowControl); + if ((flowControlTimes1++ % 3000) == 0) + { + RMQ_WARN("the consumer message buffer is full, so do flow control, {%ld} {%s} {%lld}", size, + pPullRequest->toString().c_str(), flowControlTimes1); + } + return; + } + + if (!m_consumeOrderly) + { + if (processQueue->getMaxSpan() > m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan()) + { + executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenFlowControl); + if ((flowControlTimes2++ % 3000) == 0) + { + RMQ_WARN("the queue's messages, span too long, so do flow control, size: {%ld}, pullRequest: {%s}, times: {%lld}, maxspan: {%lld}", + size, pPullRequest->toString().c_str(), flowControlTimes2, processQueue->getMaxSpan()); + } + return; + } + } + + std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner(); + std::string topic = pPullRequest->getMessageQueue().getTopic(); + std::map<std::string, SubscriptionData>::iterator it = subTable.find(topic); + if (it == subTable.end()) + { + executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException); + RMQ_WARN("find the consumer's subscription failed, {%s}", pPullRequest->toString().c_str()); + return; + } + + SubscriptionData subscriptionData = it->second; + PullCallback* pullCallback = new DefaultMQPushConsumerImplCallback(subTable[topic], this, pPullRequest); + + bool commitOffsetEnable = false; + long commitOffsetValue = 0L; + if (CLUSTERING == m_pDefaultMQPushConsumer->getMessageModel()) + { + commitOffsetValue = m_pOffsetStore->readOffset(pPullRequest->getMessageQueue(), + READ_FROM_MEMORY); + if (commitOffsetValue > 0) + { + commitOffsetEnable = true; + } + } + + int sysFlag = PullSysFlag::buildSysFlag( + commitOffsetEnable, // commitOffset + true, // suspend + false// subscription + ); + try + { + m_pPullAPIWrapper->pullKernelImpl( + pPullRequest->getMessageQueue(), // 1 + "", // 2 + subscriptionData.getSubVersion(), // 3 + pPullRequest->getNextOffset(), // 4 + m_pDefaultMQPushConsumer->getPullBatchSize(), // 5 + sysFlag, // 6 + commitOffsetValue,// 7 + s_BrokerSuspendMaxTimeMillis, // 8 + s_ConsumerTimeoutMillisWhenSuspend, // 9 + ASYNC, // 10 + pullCallback// 11 + ); + } + catch (...) + { + RMQ_ERROR("pullKernelImpl exception"); + executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException); + } + + RMQ_DEBUG("pullMessage end"); +} + +void DefaultMQPushConsumerImpl::executePullRequestImmediately(PullRequest* pullRequest) +{ + m_pMQClientFactory->getPullMessageService()->executePullRequestImmediately(pullRequest); +} + +void DefaultMQPushConsumerImpl::executePullRequestLater(PullRequest* pullRequest, long timeDelay) +{ + m_pMQClientFactory->getPullMessageService()->executePullRequestLater(pullRequest, timeDelay); +} + +void DefaultMQPushConsumerImpl::executeTaskLater(kpr::TimerHandler* handler, long timeDelay) +{ + m_pMQClientFactory->getPullMessageService()->executeTaskLater(handler, timeDelay); +} + + +void DefaultMQPushConsumerImpl::makeSureStateOK() +{ + if (m_serviceState != RUNNING) + { + THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1); + } +} + +ConsumerStatManager* DefaultMQPushConsumerImpl::getConsumerStatManager() +{ + return m_pConsumerStatManager; +} + +QueryResult DefaultMQPushConsumerImpl::queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end) +{ + return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end); +} + +void DefaultMQPushConsumerImpl::registerMessageListener(MessageListener* pMessageListener) +{ + m_pMessageListenerInner = pMessageListener; +} + +void DefaultMQPushConsumerImpl::resume() +{ + m_pause = false; +} + +long long DefaultMQPushConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp) +{ + return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp); +} + +void DefaultMQPushConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName) +{ + try + { + std::string brokerAddr = brokerName.empty() ? + socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName); + + m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, + m_pDefaultMQPushConsumer->getConsumerGroup(), + delayLevel, + 5000); + } + catch (...) + { + RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPushConsumer->getConsumerGroup().c_str()); + Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()), + msg.getBody(), msg.getBodyLen()); + + std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID); + newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId() + : originMsgId); + + newMsg.setFlag(msg.getFlag()); + newMsg.setProperties(msg.getProperties()); + newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic()); + + int reTimes = msg.getReconsumeTimes() + 1; + newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes)); + newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + reTimes); + + m_pMQClientFactory->getDefaultMQProducer()->send(newMsg); + } +} + +void DefaultMQPushConsumerImpl::checkConfig() +{ + // consumerGroup check + Validators::checkGroup(m_pDefaultMQPushConsumer->getConsumerGroup()); + + // consumerGroup + if (m_pDefaultMQPushConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP) + { + THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal " + + MixAll::DEFAULT_CONSUMER_GROUP // + + ", please specify another one.", -1); + } + + if (m_pDefaultMQPushConsumer->getMessageModel() != BROADCASTING + && m_pDefaultMQPushConsumer->getMessageModel() != CLUSTERING) + { + THROW_MQEXCEPTION(MQClientException, "messageModel is invalid ", -1); + } + + // allocateMessageQueueStrategy + if (m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy() == NULL) + { + THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1); + } + + // consumeFromWhereOffset + if (m_pDefaultMQPushConsumer->getConsumeFromWhere() < CONSUME_FROM_LAST_OFFSET + || m_pDefaultMQPushConsumer->getConsumeFromWhere() > CONSUME_FROM_MAX_OFFSET) + { + THROW_MQEXCEPTION(MQClientException, "consumeFromWhere is invalid", -1); + } + + // subscription + /* + if (m_pDefaultMQPushConsumer->getSubscription().size() == 0) + { + THROW_MQEXCEPTION(MQClientException,"subscription is null" ,-1); + } + */ + + // messageListener + if (m_pDefaultMQPushConsumer->getMessageListener() == NULL) + { + THROW_MQEXCEPTION(MQClientException, "messageListener is null", -1); + } + + MessageListener* listener = m_pDefaultMQPushConsumer->getMessageListener(); + MessageListener* orderly = (dynamic_cast<MessageListenerOrderly*>(listener)) ; + MessageListener* concurrently = (dynamic_cast<MessageListenerConcurrently*>(listener)) ; + + if (!orderly && !concurrently) + { + THROW_MQEXCEPTION(MQClientException, + "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" , + -1); + } + + // consumeThreadMin + if (m_pDefaultMQPushConsumer->getConsumeThreadMin() < 1 + || m_pDefaultMQPushConsumer->getConsumeThreadMin() > 1000 + || m_pDefaultMQPushConsumer->getConsumeThreadMin() > m_pDefaultMQPushConsumer->getConsumeThreadMax() + ) + { + THROW_MQEXCEPTION(MQClientException, "consumeThreadMin Out of range [1, 1000]", -1); + } + + // consumeThreadMax + if (m_pDefaultMQPushConsumer->getConsumeThreadMax() < 1 + || m_pDefaultMQPushConsumer->getConsumeThreadMax() > 1000) + { + THROW_MQEXCEPTION(MQClientException, "consumeThreadMax Out of range [1, 1000]", -1); + } + + // consumeConcurrentlyMaxSpan + if (m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() < 1 + || m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() > 65535) + { + THROW_MQEXCEPTION(MQClientException, "consumeConcurrentlyMaxSpan Out of range [1, 65535]" , -1); + } + + // pullThresholdForQueue + if (m_pDefaultMQPushConsumer->getPullThresholdForQueue() < 1 + || m_pDefaultMQPushConsumer->getPullThresholdForQueue() > 65535) + { + THROW_MQEXCEPTION(MQClientException, "pullThresholdForQueue Out of range [1, 65535]", -1); + } + + // pullInterval + if (m_pDefaultMQPushConsumer->getPullInterval() < 0 + || m_pDefaultMQPushConsumer->getPullInterval() > 65535) + { + THROW_MQEXCEPTION(MQClientException, "pullInterval Out of range [0, 65535]", -1); + } + + // consumeMessageBatchMaxSize + if (m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() < 1 + || m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() > 1024) + { + THROW_MQEXCEPTION(MQClientException, "consumeMessageBatchMaxSize Out of range [1, 1024]", -1); + } + + // pullBatchSize + if (m_pDefaultMQPushConsumer->getPullBatchSize() < 1 + || m_pDefaultMQPushConsumer->getPullBatchSize() > 1024) + { + THROW_MQEXCEPTION(MQClientException, "pullBatchSize Out of range [1, 1024]", -1); + } +} + +void DefaultMQPushConsumerImpl::copySubscription() +{ + try + { + std::map<std::string, std::string>& sub = m_pDefaultMQPushConsumer->getSubscription(); + std::map<std::string, std::string>::iterator it = sub.begin(); + for (; it != sub.end(); it++) + { + SubscriptionDataPtr subscriptionData = FilterAPI::buildSubscriptionData(it->first, it->second); + m_pRebalanceImpl->getSubscriptionInner()[it->first] = *subscriptionData; + } + + if (m_pMessageListenerInner == NULL) + { + m_pMessageListenerInner = m_pDefaultMQPushConsumer->getMessageListener(); + } + + switch (m_pDefaultMQPushConsumer->getMessageModel()) + { + case BROADCASTING: + break; + case CLUSTERING: + { + std::string retryTopic = MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()); + SubscriptionDataPtr subscriptionData = + FilterAPI::buildSubscriptionData(retryTopic, SubscriptionData::SUB_ALL); + m_pRebalanceImpl->getSubscriptionInner()[retryTopic] = *subscriptionData; + } + + break; + default: + break; + } + } + catch (...) + { + THROW_MQEXCEPTION(MQClientException, "subscription exception", -1); + } +} + +void DefaultMQPushConsumerImpl::updateTopicSubscribeInfoWhenSubscriptionChanged() +{ + std::map<std::string, SubscriptionData> subTable = getSubscriptionInner(); + std::map<std::string, SubscriptionData>::iterator it = subTable.begin(); + for (; it != subTable.end(); it++) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(it->first); + } +} + +MessageListener* DefaultMQPushConsumerImpl::getMessageListenerInner() +{ + return m_pMessageListenerInner; +} + +void DefaultMQPushConsumerImpl::subscribe(const std::string& topic, const std::string& subExpression) +{ + try + { + SubscriptionDataPtr subscriptionData = FilterAPI::buildSubscriptionData(topic, subExpression); + m_pRebalanceImpl->getSubscriptionInner()[topic] = *subscriptionData; + + if (m_pMQClientFactory) + { + m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock(); + } + } + catch (...) + { + THROW_MQEXCEPTION(MQClientException, "subscription exception", -1); + } +} + +void DefaultMQPushConsumerImpl::suspend() +{ + m_pause = true; +} + +void DefaultMQPushConsumerImpl::unsubscribe(const std::string& topic) +{ + m_pRebalanceImpl->getSubscriptionInner().erase(topic); +} + +void DefaultMQPushConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset) +{ + m_pOffsetStore->updateOffset(mq, offset, false); +} + +void DefaultMQPushConsumerImpl::updateCorePoolSize(int corePoolSize) +{ + m_pConsumeMessageService->updateCorePoolSize(corePoolSize); +} + +MessageExt* DefaultMQPushConsumerImpl::viewMessage(const std::string& msgId) +{ + return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId); +} + +RebalanceImpl* DefaultMQPushConsumerImpl::getRebalanceImpl() +{ + return m_pRebalanceImpl; +} + +bool DefaultMQPushConsumerImpl::isConsumeOrderly() +{ + return m_consumeOrderly; +} + +void DefaultMQPushConsumerImpl::setConsumeOrderly(bool consumeOrderly) +{ + m_consumeOrderly = consumeOrderly; +} + + +MQClientFactory* DefaultMQPushConsumerImpl::getmQClientFactory() +{ + return m_pMQClientFactory; +} + +void DefaultMQPushConsumerImpl::setmQClientFactory(MQClientFactory* mQClientFactory) +{ + m_pMQClientFactory = mQClientFactory; +} + + +ServiceState DefaultMQPushConsumerImpl::getServiceState() +{ + return m_serviceState; +} + +void DefaultMQPushConsumerImpl::setServiceState(ServiceState serviceState) +{ + m_serviceState = serviceState; +} + +} +
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h new file mode 100755 index 0000000..5370586 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h @@ -0,0 +1,169 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __DEFAULTMQPUSHCONSUMERIMPL_H__ +#define __DEFAULTMQPUSHCONSUMERIMPL_H__ + +#include <string> +#include <set> +#include <map> + +#include "MQConsumerInner.h" +#include "MessageExt.h" +#include "QueryResult.h" +#include "ServiceState.h" +#include "PullResult.h" +#include "ConsumeMessageHook.h" +#include "MixAll.h" +#include "PullCallback.h" +#include "TimerThread.h" + +namespace rmq +{ + class DefaultMQPushConsumer; + class ConsumeMessageHook; + class OffsetStore; + class RebalanceImpl; + class ConsumerStatManager; + class ConsumeMessageService; + class MessageListener; + class PullRequest; + class MQClientFactory; + class PullAPIWrapper; + class PullMessageService; + class DefaultMQPushConsumerImplCallback; + class MQException; + + /** + * Push Consumer Impl + * + */ + class DefaultMQPushConsumerImpl : public MQConsumerInner + { + public: + DefaultMQPushConsumerImpl(DefaultMQPushConsumer* pDefaultMQPushConsumer); + ~DefaultMQPushConsumerImpl(); + + void start(); + void suspend(); + void resume(); + void shutdown(); + bool isPause(); + void setPause(bool pause); + + bool hasHook(); + void registerHook(ConsumeMessageHook* pHook); + void executeHookBefore(ConsumeMessageContext& context); + void executeHookAfter(ConsumeMessageContext& context); + + void createTopic(const std::string& key, const std::string& newTopic, int queueNum); + std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); + + long long earliestMsgStoreTime(const MessageQueue& mq); + long long maxOffset(const MessageQueue& mq); + long long minOffset(const MessageQueue& mq); + OffsetStore* getOffsetStore() ; + void setOffsetStore(OffsetStore* pOffsetStore); + + //MQConsumerInner + std::string groupName() ; + MessageModel messageModel() ; + ConsumeType consumeType(); + ConsumeFromWhere consumeFromWhere(); + std::set<SubscriptionData> subscriptions(); + void doRebalance() ; + void persistConsumerOffset() ; + void updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info); + std::map<std::string, SubscriptionData>& getSubscriptionInner() ; + bool isSubscribeTopicNeedUpdate(const std::string& topic); + + MessageExt* viewMessage(const std::string& msgId); + QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end); + + void registerMessageListener(MessageListener* pMessageListener); + long long searchOffset(const MessageQueue& mq, long long timestamp); + void sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName); + + void subscribe(const std::string& topic, const std::string& subExpression); + void unsubscribe(const std::string& topic); + + void updateConsumeOffset(MessageQueue& mq, long long offset); + void updateCorePoolSize(int corePoolSize); + bool isConsumeOrderly(); + void setConsumeOrderly(bool consumeOrderly); + + RebalanceImpl* getRebalanceImpl() ; + MessageListener* getMessageListenerInner(); + DefaultMQPushConsumer* getDefaultMQPushConsumer() ; + ConsumerStatManager* getConsumerStatManager(); + + MQClientFactory* getmQClientFactory(); + void setmQClientFactory(MQClientFactory* mQClientFactory); + + ServiceState getServiceState(); + void setServiceState(ServiceState serviceState); + + private: + void correctTagsOffset(PullRequest& pullRequest) ; + + void pullMessage(PullRequest* pPullRequest); + + + void executePullRequestImmediately(PullRequest* pullRequest); + + + void executePullRequestLater(PullRequest* pullRequest, long timeDelay); + void executeTaskLater(kpr::TimerHandler* handler, long timeDelay); + + void makeSureStateOK(); + void checkConfig(); + void copySubscription() ; + void updateTopicSubscribeInfoWhenSubscriptionChanged(); + + private: + static const int s_PullTimeDelayMillsWhenException = 3000; + static const int s_PullTimeDelayMillsWhenFlowControl = 50; + static const int s_PullTimeDelayMillsWhenSuspend = 1000; + static const int s_BrokerSuspendMaxTimeMillis = 15000; + static const int s_ConsumerTimeoutMillisWhenSuspend = 30000; + + long long flowControlTimes1; + long long flowControlTimes2; + ServiceState m_serviceState; + volatile bool m_pause; + bool m_consumeOrderly; + DefaultMQPushConsumer* m_pDefaultMQPushConsumer; + MQClientFactory* m_pMQClientFactory; + PullAPIWrapper* m_pPullAPIWrapper; + MessageListener* m_pMessageListenerInner; + OffsetStore* m_pOffsetStore; + RebalanceImpl* m_pRebalanceImpl; + ConsumerStatManager* m_pConsumerStatManager; + ConsumeMessageService* m_pConsumeMessageService; + + std::list<ConsumeMessageHook*> m_hookList; + friend class PullMessageService; + friend class RebalancePushImpl; + friend class DefaultMQPushConsumerImplCallback; + }; +} + +#endif + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp new file mode 100755 index 0000000..40e9d65 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp @@ -0,0 +1,257 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "LocalFileOffsetStore.h" + +#include "MQClientFactory.h" +#include "OffsetSerializeWrapper.h" +#include "ScopedLock.h" +#include "FileUtil.h" +#include "MixAll.h" +#include "Exception.h" +#include "MQClientException.h" + +namespace rmq +{ + +LocalFileOffsetStore::LocalFileOffsetStore(MQClientFactory* pMQClientFactory, + const std::string& groupName) +{ + m_pMQClientFactory = pMQClientFactory; + m_groupName = groupName; + std::string homePath = getenv("HOME"); + m_storePath = homePath + "/.rocketmq_offsets/" + m_pMQClientFactory->getClientId() + + "/" + m_groupName + "/offsets.json"; +} + +void LocalFileOffsetStore::load() +{ + OffsetSerializeWrapperPtr offsetSerializeWrapper = this->readLocalOffset(); + if (offsetSerializeWrapper.ptr() != NULL + && offsetSerializeWrapper->getOffsetTable().size() > 0) + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); + m_offsetTable = offsetSerializeWrapper->getOffsetTable(); + RMQ_FOR_EACH(m_offsetTable, it) + { + const MessageQueue& mq = it->first; + const kpr::AtomicLong& offset = it->second; + RMQ_INFO("load consumer's offset, {%s} {%s} {%lld}", + m_groupName.c_str(), + mq.toString().c_str(), + offset.get()); + } + } +} + + +void LocalFileOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly) +{ + RMQ_DEBUG("updateOffset, MQ:%s, offset:%lld", mq.toString().c_str(), offset); + kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); + typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); + if (it == m_offsetTable.end()) + { + m_offsetTable[mq] = offset; + it = m_offsetTable.find(mq); + } + + kpr::AtomicLong& offsetOld = it->second; + if (increaseOnly) + { + MixAll::compareAndIncreaseOnly(offsetOld, offset); + } + else + { + offsetOld.set(offset); + } +} + +long long LocalFileOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type) +{ + RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type); + switch (type) + { + case MEMORY_FIRST_THEN_STORE: + case READ_FROM_MEMORY: + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); + if (it != m_offsetTable.end()) + { + return it->second.get(); + } + else if (READ_FROM_MEMORY == type) + { + RMQ_WARN("No offset in memory, MQ:%s", mq.toString().c_str()); + return -1; + } + } + case READ_FROM_STORE: + { + OffsetSerializeWrapperPtr offsetSerializeWrapper; + try + { + offsetSerializeWrapper = this->readLocalOffset(); + } + catch (std::exception& e) + { + RMQ_WARN("load offset file fail, MQ:%s, exception:%s", mq.toString().c_str(), e.what()); + return -1; + } + + if (offsetSerializeWrapper.ptr() != NULL) + { + std::map<MessageQueue, kpr::AtomicLong>& offsetTable = offsetSerializeWrapper->getOffsetTable(); + typeof(offsetTable.begin()) it = offsetTable.find(mq); + if (it != offsetTable.end()) + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); + m_offsetTable[mq] = it->second.get(); + return it->second.get(); + } + } + return -1; + } + default: + break; + } + + return -1; +} + + +void LocalFileOffsetStore::persistAll(std::set<MessageQueue>& mqs) +{ + RMQ_DEBUG("persistAll, mqs.size={%u}, mqs=%s", + (unsigned)mqs.size(), UtilAll::toString(mqs).c_str()); + if (mqs.empty()) + { + return; + } + RMQ_DEBUG("persistAll, m_offsetTable.size={%u}, m_offsetTable=%s", + (unsigned)m_offsetTable.size(), UtilAll::toString(m_offsetTable).c_str()); + + OffsetSerializeWrapper offsetSerializeWrapper; + std::map<MessageQueue, kpr::AtomicLong>& offsetTable = offsetSerializeWrapper.getOffsetTable(); + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + RMQ_FOR_EACH(m_offsetTable, it) + { + MessageQueue mq = it->first; + kpr::AtomicLong& offset = it->second; + if (mqs.find(mq) != mqs.end()) + { + offsetTable[mq] = offset; + } + } + } + + RMQ_DEBUG("persistAll, offsetTable.size={%u}, offsetTable=%s", + (unsigned)offsetTable.size(), UtilAll::toString(offsetTable).c_str()); + + std::string jsonString; + offsetSerializeWrapper.encode(jsonString); + RMQ_DEBUG("persistAll, json=%s", jsonString.c_str()); + + if (!jsonString.empty()) + { + try + { + kpr::FileUtil::makeDirRecursive(kpr::FileUtil::extractFilePath(m_storePath)); + MixAll::string2File(m_storePath, jsonString); + } + catch (const std::exception& e) + { + RMQ_ERROR("persistAll consumer offset Exception, %s, %s", m_storePath.c_str(), e.what()); + } + } +} + +void LocalFileOffsetStore::persist(const MessageQueue& mq) +{ +} + +void LocalFileOffsetStore::removeOffset(const MessageQueue& mq) +{ +} + + +std::map<MessageQueue, long long> LocalFileOffsetStore::cloneOffsetTable(const std::string& topic) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + std::map<MessageQueue, long long> cloneOffsetTable; + RMQ_FOR_EACH(m_offsetTable, it) + { + MessageQueue mq = it->first; + kpr::AtomicLong& offset = it->second; + if (topic == mq.getTopic()) + { + cloneOffsetTable[mq] = offset.get(); + } + } + + return cloneOffsetTable; +} + + +OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffset() +{ + std::string content = MixAll::file2String(m_storePath); + if (content.length() == 0) + { + return this->readLocalOffsetBak(); + } + else + { + OffsetSerializeWrapper* offsetSerializeWrapper = NULL; + try + { + offsetSerializeWrapper = OffsetSerializeWrapper::decode(content.c_str(), content.size()); + } + catch (const MQException& e) + { + RMQ_WARN("readLocalOffset Exception, and try to correct, %s", e.what()); + return this->readLocalOffsetBak(); + } + + return offsetSerializeWrapper; + } +} + + +OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffsetBak() +{ + std::string content = MixAll::file2String(m_storePath + ".bak"); + if (content.length() > 0) + { + OffsetSerializeWrapper* offsetSerializeWrapper = NULL; + try + { + offsetSerializeWrapper = OffsetSerializeWrapper::decode(content.c_str(), content.size()); + } + catch (const MQException& e) + { + RMQ_WARN("readLocalOffset Exception, maybe json content invalid, %s", e.what()); + } + + return offsetSerializeWrapper; + } + + return NULL; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h new file mode 100755 index 0000000..c4efb76 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h @@ -0,0 +1,61 @@ +/** +* Copyright (C) 2013 suwenkuang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __LOCALFILEOFFSETSTORE_H__ +#define __LOCALFILEOFFSETSTORE_H__ +#include <map> +#include <string> +#include <set> + +#include "RocketMQClient.h" +#include "OffsetStore.h" +#include "MessageQueue.h" +#include "AtomicValue.h" +#include "Mutex.h" + +namespace rmq +{ + class MQClientFactory; + class MessageQueue; + class OffsetSerializeWrapper; + + class LocalFileOffsetStore : public OffsetStore + { + public: + LocalFileOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName); + + void load(); + void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly); + long long readOffset(const MessageQueue& mq, ReadOffsetType type); + void persistAll(std::set<MessageQueue>& mqs); + void persist(const MessageQueue& mq); + void removeOffset(const MessageQueue& mq) ; + std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic); + + private: + OffsetSerializeWrapper* readLocalOffset(); + OffsetSerializeWrapper* readLocalOffsetBak(); + + private: + MQClientFactory* m_pMQClientFactory; + std::string m_groupName; + std::string m_storePath; + std::map<MessageQueue, kpr::AtomicLong> m_offsetTable; + kpr::RWMutex m_tableMutex; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/MQConsumerInner.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/MQConsumerInner.h b/rocketmq-client4cpp/src/consumer/MQConsumerInner.h new file mode 100755 index 0000000..ed83621 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/MQConsumerInner.h @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MQCONSUMERINNER_H__ +#define __MQCONSUMERINNER_H__ + +#include <string> +#include <set> + +#include "ConsumeType.h" +#include "SubscriptionData.h" + +namespace rmq +{ + class MessageQueue; + + class MQConsumerInner + { + public: + virtual ~MQConsumerInner() {} + virtual std::string groupName() = 0; + virtual MessageModel messageModel() = 0; + virtual ConsumeType consumeType() = 0; + virtual ConsumeFromWhere consumeFromWhere() = 0; + virtual std::set<SubscriptionData> subscriptions() = 0; + virtual void doRebalance() = 0; + virtual void persistConsumerOffset() = 0; + virtual void updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info) = 0; + virtual bool isSubscribeTopicNeedUpdate(const std::string& topic) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/MessageQueueLock.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/MessageQueueLock.h b/rocketmq-client4cpp/src/consumer/MessageQueueLock.h new file mode 100755 index 0000000..65af99e --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/MessageQueueLock.h @@ -0,0 +1,68 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MESSAGEQUEUELOCK_H__ +#define __MESSAGEQUEUELOCK_H__ + +#include <map> +#include "Mutex.h" +#include "ScopedLock.h" +#include "MessageQueue.h" + +namespace rmq +{ + class MessageQueueLock + { + public: + MessageQueueLock() + { + + } + + ~MessageQueueLock() + { + std::map<MessageQueue, kpr::Mutex*>::iterator it = m_mqLockTable.begin(); + + for (; it != m_mqLockTable.end(); it++) + { + delete it->second; + } + } + + kpr::Mutex* fetchLockObject(MessageQueue& mq) + { + kpr::ScopedLock<kpr::Mutex> lock(m_lock); + std::map<MessageQueue, kpr::Mutex*>::iterator it = m_mqLockTable.find(mq); + kpr::Mutex* objLock; + if (it == m_mqLockTable.end()) + { + objLock = new kpr::Mutex(); + m_mqLockTable[mq] = objLock; + } + else + { + objLock = it->second; + } + + return objLock; + } + + private: + std::map<MessageQueue, kpr::Mutex*> m_mqLockTable; + kpr::Mutex m_lock; + }; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp b/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp new file mode 100755 index 0000000..f90e502 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp @@ -0,0 +1,445 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "ProcessQueue.h" +#include "MessageExt.h" +#include "KPRUtil.h" +#include "UtilAll.h" +#include "ScopedLock.h" +#include "DefaultMQPushConsumer.h" +#include "DefaultMQPushConsumerImpl.h" + +namespace rmq +{ + +ProcessQueue::ProcessQueue() +{ + m_lastPullTimestamp = KPRUtil::GetCurrentTimeMillis(); + m_lastConsumeTimestamp = KPRUtil::GetCurrentTimeMillis(); + m_queueOffsetMax = 0L; + m_msgCount = 0; + m_dropped = false; + m_locked = false; + m_lastLockTimestamp = KPRUtil::GetCurrentTimeMillis(); + m_consuming = false; +} + +bool ProcessQueue::isLockExpired() +{ + bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastLockTimestamp) > + s_RebalanceLockMaxLiveTime; + return result; +} + +bool ProcessQueue::isPullExpired() +{ + bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastPullTimestamp) > + s_PullMaxIdleTime; + return result; +} + + +void ProcessQueue::cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer) +{ + if (pPushConsumer->getDefaultMQPushConsumerImpl()->isConsumeOrderly()) + { + return; + } + + long long now = KPRUtil::GetCurrentTimeMillis(); + int loop = m_msgTreeMap.size() < 16 ? m_msgTreeMap.size() : 16; + for (int i = 0; i < loop; i++) + { + MessageExt* msg = NULL; + try + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap); + if (m_msgTreeMap.empty()) + { + return; + } + + MessageExt* firstMsg = m_msgTreeMap.begin()->second; + long long startTimestamp = UtilAll::str2ll(firstMsg->getProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP).c_str()); + if (startTimestamp > 0 && (now - startTimestamp) > (pPushConsumer->getConsumeTimeout() * 60 * 1000)) + { + msg = firstMsg; + } + else + { + return; + } + } + catch (...) + { + RMQ_ERROR("getExpiredMsg exception"); + } + + try + { + pPushConsumer->sendMessageBack((*msg), 3); + RMQ_WARN("send expire msg back. topic={%s}, msgId={%s}, storeHost={%s}, queueId={%d}, queueOffset={%lld}", + msg->getTopic().c_str(), msg->getMsgId().c_str(), msg->getStoreHostString().c_str(), + msg->getQueueId(), msg->getQueueOffset()); + + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + if (!m_msgTreeMap.empty() && msg->getQueueOffset() == m_msgTreeMap.begin()->first) + { + try + { + m_msgTreeMap.erase(m_msgTreeMap.begin()); + m_msgCount -= 1; + // if free msg, may be coredump + //delete msg; + } + catch (...) + { + RMQ_ERROR("send expired msg exception"); + } + } + + } + catch (...) + { + RMQ_ERROR("delExpiredMsg exception"); + } + } + catch (...) + { + RMQ_ERROR("send expired msg exception"); + } + } +} + + +bool ProcessQueue::putMessage(const std::list<MessageExt *> &msgs) +{ + bool dispathToConsume = false; + + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + int validMsgCnt = 0; + std::list<MessageExt *>::const_iterator it = msgs.begin(); + + for (; it != msgs.end(); it++) + { + MessageExt *msg = (*it); + + if (m_msgTreeMap.find(msg->getQueueOffset()) == m_msgTreeMap.end()) + { + validMsgCnt++; + m_queueOffsetMax = msg->getQueueOffset(); + } + + m_msgTreeMap[msg->getQueueOffset()] = msg; + } + + m_msgCount += validMsgCnt; + + if (!m_msgTreeMap.empty() && !m_consuming) + { + dispathToConsume = true; + m_consuming = true; + } + } + catch (...) + { + RMQ_ERROR("putMessage exception"); + } + + return dispathToConsume; +} + +long long ProcessQueue::getMaxSpan() +{ + try + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap); + + if (!m_msgTreeMap.empty()) + { + std::map<long long, MessageExt *>::iterator it1 = m_msgTreeMap.begin(); + std::map<long long, MessageExt *>::iterator it2 = m_msgTreeMap.end(); + it2--; + return it2->first - it1->first; + } + } + catch (...) + { + RMQ_ERROR("getMaxSpan exception"); + } + + return 0; +} + +long long ProcessQueue::removeMessage(std::list<MessageExt *> &msgs) +{ + long long result = -1; + unsigned long long now = KPRUtil::GetCurrentTimeMillis(); + + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + m_lastConsumeTimestamp = now; + + if (!m_msgTreeMap.empty()) + { + result = m_queueOffsetMax + 1; + int removedCnt = 0; + std::list<MessageExt *>::iterator it = msgs.begin(); + + for (; it != msgs.end();) + { + MessageExt *msg = (*it); + + if (m_msgTreeMap.find(msg->getQueueOffset()) != m_msgTreeMap.end()) + { + removedCnt++; + } + + m_msgTreeMap.erase(msg->getQueueOffset()); + //TODO delete message? + it = msgs.erase(it); + delete msg; + } + + m_msgCount -= removedCnt; + + if (!m_msgTreeMap.empty()) + { + std::map<long long, MessageExt *>::iterator it = m_msgTreeMap.begin(); + result = it->first; + } + } + } + catch (...) + { + RMQ_ERROR("removeMessage exception"); + } + + return result; +} + + +void ProcessQueue::clear() +{ + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + m_msgTreeMap.clear(); + m_msgTreeMapTemp.clear(); + m_msgCount.set(0); + m_queueOffsetMax = 0; + } + catch (...) + { + RMQ_ERROR("clear exception"); + } + + return; +} + + +std::map<long long, MessageExt *> ProcessQueue::getMsgTreeMap() +{ + return m_msgTreeMap; +} + +kpr::AtomicInteger ProcessQueue::getMsgCount() +{ + return m_msgCount; +} + +bool ProcessQueue::isDropped() +{ + return m_dropped; +} + +void ProcessQueue::setDropped(bool dropped) +{ + m_dropped = dropped; +} + +unsigned long long ProcessQueue::getLastPullTimestamp() +{ + return m_lastPullTimestamp; +} + + +void ProcessQueue::setLastPullTimestamp(unsigned long long lastPullTimestamp) +{ + m_lastPullTimestamp = lastPullTimestamp; +} + + +unsigned long long ProcessQueue::getLastConsumeTimestamp() +{ + return m_lastConsumeTimestamp; +} + + +void ProcessQueue::setLastConsumeTimestamp(unsigned long long + lastConsumeTimestamp) +{ + m_lastConsumeTimestamp = lastConsumeTimestamp; +} + + +/** +* ======================================================================== +*/ +kpr::Mutex &ProcessQueue::getLockConsume() +{ + return m_lockConsume; +} + +void ProcessQueue::setLocked(bool locked) +{ + m_locked = locked; +} + +bool ProcessQueue::isLocked() +{ + return m_locked; +} + +long long ProcessQueue::getTryUnlockTimes() +{ + return m_tryUnlockTimes.get(); +} + +void ProcessQueue::incTryUnlockTimes() +{ + m_tryUnlockTimes++; +} + + +void ProcessQueue::rollback() +{ + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + m_msgTreeMap = m_msgTreeMapTemp; + m_msgTreeMapTemp.clear(); + } + catch (...) + { + RMQ_ERROR("rollback exception"); + } +} + +long long ProcessQueue::commit() +{ + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + + if (!m_msgTreeMapTemp.empty()) + { + std::map<long long, MessageExt *>::iterator it = m_msgTreeMapTemp.end(); + it--; + long long offset = it->first; + m_msgCount -= m_msgTreeMapTemp.size(); + m_msgTreeMapTemp.clear(); + return offset + 1; + } + } + catch (...) + { + RMQ_ERROR("commit exception"); + } + + return -1; +} + +void ProcessQueue::makeMessageToCosumeAgain(const std::list<MessageExt *> &msgs) +{ + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + std::list<MessageExt *>::const_iterator it = msgs.begin(); + + for (; it != msgs.end(); it++) + { + MessageExt *msg = (*it); + m_msgTreeMapTemp.erase(msg->getQueueOffset()); + m_msgTreeMap[msg->getQueueOffset()] = msg; + } + } + catch (...) + { + RMQ_ERROR("makeMessageToCosumeAgain exception"); + } +} + +std::list<MessageExt *> ProcessQueue::takeMessages(int batchSize) +{ + std::list<MessageExt *> result; + unsigned long long now = KPRUtil::GetCurrentTimeMillis(); + + try + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap); + m_lastConsumeTimestamp = now; + + if (!m_msgTreeMap.empty()) + { + for (int i = 0; i < batchSize; i++) + { + std::map<long long, MessageExt *>::iterator it = m_msgTreeMap.begin(); + + if (it != m_msgTreeMap.end()) + { + result.push_back(it->second); + m_msgTreeMapTemp[it->first] = it->second; + m_msgTreeMap.erase(it); + } + else + { + break; + } + } + + if (result.empty()) + { + m_consuming = false; + } + } + } + catch (...) + { + RMQ_ERROR("takeMessags exception"); + } + + return result; +} + +long long ProcessQueue::getLastLockTimestamp() +{ + return m_lastLockTimestamp; +} + +void ProcessQueue::setLastLockTimestamp(long long lastLockTimestamp) +{ + m_lastLockTimestamp = lastLockTimestamp; +} + + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ProcessQueue.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.h b/rocketmq-client4cpp/src/consumer/ProcessQueue.h new file mode 100755 index 0000000..559dd7f --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ProcessQueue.h @@ -0,0 +1,102 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PROCESSQUEUE_H__ +#define __PROCESSQUEUE_H__ + +#include <list> +#include <map> +#include "Mutex.h" +#include "AtomicValue.h" + +namespace rmq +{ + class MessageExt; + class DefaultMQPushConsumer; + + class ProcessQueue + { + public: + static const unsigned int s_RebalanceLockMaxLiveTime = 30000; + static const unsigned int s_RebalanceLockInterval = 20000; + static const unsigned int s_PullMaxIdleTime = 120000; + + public: + ProcessQueue(); + + bool isLockExpired(); + bool isPullExpired(); + + void cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer); + bool putMessage(const std::list<MessageExt*>& msgs); + + long long getMaxSpan(); + long long removeMessage(std::list<MessageExt*>& msgs); + + void clear(); + + std::map<long long, MessageExt*> getMsgTreeMap(); + kpr::AtomicInteger getMsgCount(); + bool isDropped(); + void setDropped(bool dropped); + + unsigned long long getLastPullTimestamp(); + void setLastPullTimestamp(unsigned long long lastPullTimestamp); + + unsigned long long getLastConsumeTimestamp(); + void setLastConsumeTimestamp(unsigned long long lastConsumeTimestamp); + + /** + * ======================================================================== + */ + kpr::Mutex& getLockConsume(); + void setLocked(bool locked); + bool isLocked(); + long long getTryUnlockTimes(); + void incTryUnlockTimes(); + + void rollback(); + long long commit(); + void makeMessageToCosumeAgain(const std::list<MessageExt*>& msgs); + + std::list<MessageExt*> takeMessages(int batchSize); + + long long getLastLockTimestamp(); + void setLastLockTimestamp(long long lastLockTimestamp); + + + private: + kpr::RWMutex m_lockTreeMap; + std::map<long long, MessageExt*> m_msgTreeMap; + volatile long long m_queueOffsetMax ; + kpr::AtomicInteger m_msgCount; + volatile bool m_dropped; + volatile unsigned long long m_lastPullTimestamp; + volatile unsigned long long m_lastConsumeTimestamp; + + /** + * order message + */ + kpr::Mutex m_lockConsume; + volatile bool m_locked; + volatile unsigned long long m_lastLockTimestamp; + volatile bool m_consuming; + std::map<long long, MessageExt*> m_msgTreeMapTemp; + kpr::AtomicInteger m_tryUnlockTimes; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp new file mode 100755 index 0000000..c520e4c --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp @@ -0,0 +1,222 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "PullAPIWrapper.h" + +#include <stdlib.h> +#include <list> +#include <set> +#include "ScopedLock.h" +#include "MQClientFactory.h" +#include "PullCallback.h" +#include "MixAll.h" +#include "PullSysFlag.h" +#include "CommandCustomHeader.h" +#include "MQClientAPIImpl.h" +#include "MQClientException.h" +#include "SubscriptionData.h" +#include "UtilAll.h" +#include "MessageExt.h" +#include "PullResultExt.h" +#include "MessageDecoder.h" +#include "VirtualEnvUtil.h" + +namespace rmq +{ + +PullAPIWrapper::PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& consumerGroup) +{ + m_pMQClientFactory = pMQClientFactory; + m_consumerGroup = consumerGroup; +} + +void PullAPIWrapper::updatePullFromWhichNode(MessageQueue& mq, long brokerId) +{ + std::map<MessageQueue, kpr::AtomicInteger>::iterator it; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock); + it = m_pullFromWhichNodeTable.find(mq); + if (it != m_pullFromWhichNodeTable.end()) + { + it->second.set(brokerId); + return; + } + } + + kpr::ScopedWLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock); + m_pullFromWhichNodeTable[mq] = kpr::AtomicInteger(brokerId); +} + +PullResult* PullAPIWrapper::processPullResult(MessageQueue& mq, + PullResult& pullResult, + SubscriptionData& subscriptionData) +{ + std::string projectGroupPrefix = m_pMQClientFactory->getMQClientAPIImpl()->getProjectGroupPrefix(); + PullResultExt& pullResultExt = (PullResultExt&) pullResult; + + updatePullFromWhichNode(mq, pullResultExt.suggestWhichBrokerId); + + if (pullResult.pullStatus == FOUND) + { + std::list<MessageExt*> msgList = + MessageDecoder::decodes(pullResultExt.messageBinary, pullResultExt.messageBinaryLen); + + std::list<MessageExt*> msgListFilterAgain; + + if (!subscriptionData.getTagsSet().empty()) + { + std::list<MessageExt*>::iterator it = msgList.begin(); + for (; it != msgList.end();) + { + MessageExt* msg = *it; + if (!msg->getTags().empty()) + { + std::set<std::string>& tags = subscriptionData.getTagsSet(); + if (tags.find(msg->getTags()) != tags.end()) + { + msgListFilterAgain.push_back(msg); + it = msgList.erase(it); + } + else + { + it++; + } + } + } + } + else + { + msgListFilterAgain.assign(msgList.begin(), msgList.end()); + msgList.clear(); + } + + if (!UtilAll::isBlank(projectGroupPrefix)) + { + subscriptionData.setTopic(VirtualEnvUtil::clearProjectGroup(subscriptionData.getTopic(), + projectGroupPrefix)); + mq.setTopic(VirtualEnvUtil::clearProjectGroup(mq.getTopic(), projectGroupPrefix)); + + std::list<MessageExt*>::iterator it = msgListFilterAgain.begin(); + for (; it != msgListFilterAgain.end(); it++) + { + MessageExt* msg = *it; + msg->setTopic(VirtualEnvUtil::clearProjectGroup(msg->getTopic(), projectGroupPrefix)); + + msg->putProperty(Message::PROPERTY_MIN_OFFSET, UtilAll::toString(pullResult.minOffset)); + msg->putProperty(Message::PROPERTY_MAX_OFFSET, UtilAll::toString(pullResult.maxOffset)); + } + } + else + { + std::list<MessageExt*>::iterator it = msgListFilterAgain.begin(); + for (; it != msgListFilterAgain.end(); it++) + { + MessageExt* msg = *it; + + msg->putProperty(Message::PROPERTY_MIN_OFFSET, UtilAll::toString(pullResult.minOffset)); + msg->putProperty(Message::PROPERTY_MAX_OFFSET, UtilAll::toString(pullResult.maxOffset)); + } + } + + std::list<MessageExt*>::iterator it = msgListFilterAgain.begin(); + for (; it != msgListFilterAgain.end(); it++) + { + pullResultExt.msgFoundList.push_back(*it); + } + + it = msgList.begin(); + for (; it != msgList.end(); it++) + { + delete *it; + } + + delete[] pullResultExt.messageBinary; + pullResultExt.messageBinary = NULL; + pullResultExt.messageBinaryLen = 0; + } + + return &pullResult; +} + +long PullAPIWrapper::recalculatePullFromWhichNode(MessageQueue& mq) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock); + std::map<MessageQueue, kpr::AtomicInteger>::iterator it = m_pullFromWhichNodeTable.find(mq); + if (it != m_pullFromWhichNodeTable.end()) + { + return it->second.get(); + } + + return MixAll::MASTER_ID; +} + +PullResult* PullAPIWrapper::pullKernelImpl(MessageQueue& mq, + const std::string& subExpression, + long long subVersion, + long long offset, + int maxNums, + int sysFlag, + long long commitOffset, + long long brokerSuspendMaxTimeMillis, + int timeoutMillis, + CommunicationMode communicationMode, + PullCallback* pPullCallback) +{ + FindBrokerResult findBrokerResult = + m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), + recalculatePullFromWhichNode(mq), false); + if (findBrokerResult.brokerAddr.empty()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), + recalculatePullFromWhichNode(mq), false); + } + + if (!findBrokerResult.brokerAddr.empty()) + { + int sysFlagInner = sysFlag; + + if (findBrokerResult.slave) + { + sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner); + } + + PullMessageRequestHeader* requestHeader = new PullMessageRequestHeader(); + requestHeader->consumerGroup = m_consumerGroup; + requestHeader->topic = mq.getTopic(); + requestHeader->queueId = mq.getQueueId(); + requestHeader->queueOffset = offset; + requestHeader->maxMsgNums = maxNums; + requestHeader->sysFlag = sysFlagInner; + requestHeader->commitOffset = commitOffset; + requestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis; + requestHeader->subscription = subExpression; + requestHeader->subVersion = subVersion; + + PullResult* pullResult = m_pMQClientFactory->getMQClientAPIImpl()->pullMessage(// + findBrokerResult.brokerAddr,// + requestHeader,// + timeoutMillis,// + communicationMode,// + pPullCallback); + + return pullResult; + } + + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h new file mode 100755 index 0000000..d5ec787 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h @@ -0,0 +1,67 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PULLAPIWRAPPER_H__ +#define __PULLAPIWRAPPER_H__ + +#include <string> +#include <map> + +#include "AtomicValue.h" +#include "PullResult.h" +#include "MessageQueue.h" +#include "CommunicationMode.h" +#include "Mutex.h" + +namespace rmq +{ + class MQClientFactory; + class PullCallback; + class SubscriptionData; + + class PullAPIWrapper + { + public: + PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& consumerGroup); + void updatePullFromWhichNode(MessageQueue& mq, long brokerId); + + + PullResult* processPullResult(MessageQueue& mq, + PullResult& pullResult, + SubscriptionData& subscriptionData); + long recalculatePullFromWhichNode(MessageQueue& mq); + + PullResult* pullKernelImpl(MessageQueue& mq, + const std::string& subExpression, + long long subVersion, + long long offset, + int maxNums, + int sysFlag, + long long commitOffset, + long long brokerSuspendMaxTimeMillis, + int timeoutMillis, + CommunicationMode communicationMode, + PullCallback* pPullCallback); + + private: + std::map<MessageQueue, kpr::AtomicInteger> m_pullFromWhichNodeTable; + kpr::RWMutex m_pullFromWhichNodeTableLock; + MQClientFactory* m_pMQClientFactory; + std::string m_consumerGroup; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullMessageService.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.cpp b/rocketmq-client4cpp/src/consumer/PullMessageService.cpp new file mode 100755 index 0000000..6d9972e --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullMessageService.cpp @@ -0,0 +1,171 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "PullMessageService.h" +#include <list> +#include "MQClientFactory.h" +#include "MQConsumerInner.h" +#include "PullRequest.h" +#include "DefaultMQPushConsumerImpl.h" +#include "ScopedLock.h" + +namespace rmq +{ + +class SubmitPullRequestLater : public kpr::TimerHandler +{ +public: + SubmitPullRequestLater(PullMessageService* pService, PullRequest* pPullRequest) + : m_pService(pService), m_pPullRequest(pPullRequest) + { + + } + + void OnTimeOut(unsigned int timerID) + { + try + { + m_pService->executePullRequestImmediately(m_pPullRequest); + } + catch(...) + { + RMQ_ERROR("SubmitPullRequestLater OnTimeOut exception"); + } + + delete this; + } + +private: + PullMessageService* m_pService; + PullRequest* m_pPullRequest; +}; + + +PullMessageService::PullMessageService(MQClientFactory* pMQClientFactory) + : ServiceThread("PullMessageService"), + m_pMQClientFactory(pMQClientFactory) +{ + m_TimerThread = new kpr::TimerThread("PullMessageService-timer", 10); + m_TimerThread->Start(); +} + + +PullMessageService::~PullMessageService() +{ + +} + + +void PullMessageService::executePullRequestLater(PullRequest* pPullRequest, long timeDelay) +{ + SubmitPullRequestLater* pHandler = new SubmitPullRequestLater(this, pPullRequest); + m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false); +} + + +void PullMessageService::executeTaskLater(kpr::TimerHandler* pHandler, long timeDelay) +{ + m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false); +} + + +void PullMessageService::executePullRequestImmediately(PullRequest* pPullRequest) +{ + try + { + { + kpr::ScopedLock<kpr::Mutex> lock(m_lock); + m_pullRequestQueue.push_back(pPullRequest); + } + + wakeup(); + } + catch (...) + { + RMQ_ERROR("executePullRequestImmediately pullRequestQueue.push"); + } +} + +void PullMessageService::Run() +{ + RMQ_INFO("%s service started", getServiceName().c_str()); + + while (!m_stoped) + { + try + { + bool wait = false; + { + kpr::ScopedLock<kpr::Mutex> lock(m_lock); + if (m_pullRequestQueue.empty()) + { + wait = true; + } + } + + if (wait) + { + waitForRunning(5000); + } + + PullRequest* pullRequest = NULL; + { + kpr::ScopedLock<kpr::Mutex> lock(m_lock); + if (!m_pullRequestQueue.empty()) + { + pullRequest = m_pullRequestQueue.front(); + m_pullRequestQueue.pop_front(); + } + } + + if (pullRequest != NULL) + { + pullMessage(pullRequest); + } + } + catch (...) + { + RMQ_ERROR("Pull Message Service Run Method exception"); + } + } + + m_TimerThread->Stop(); + m_TimerThread->Join(); + + RMQ_INFO("%s service end", getServiceName().c_str()); +} + +std::string PullMessageService::getServiceName() +{ + return "PullMessageService"; +} + + +void PullMessageService::pullMessage(PullRequest* pPullRequest) +{ + MQConsumerInner* consumer = m_pMQClientFactory->selectConsumer(pPullRequest->getConsumerGroup()); + if (consumer != NULL) + { + DefaultMQPushConsumerImpl* impl = (DefaultMQPushConsumerImpl*) consumer; + impl->pullMessage(pPullRequest); + } + else + { + RMQ_WARN("No matched consumer for the PullRequest {%s}, drop it", pPullRequest->toString().c_str()); + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullMessageService.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.h b/rocketmq-client4cpp/src/consumer/PullMessageService.h new file mode 100755 index 0000000..d6ebcee --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullMessageService.h @@ -0,0 +1,56 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __PULLMESSAGESERVICE_H__ +#define __PULLMESSAGESERVICE_H__ + +#include <list> +#include "RocketMQClient.h" +#include "ServiceThread.h" +#include "TimerThread.h" +#include "PullRequest.h" + +namespace rmq +{ + class MQClientFactory; + class MQConsumerInner; + class PullRequest; + + class PullMessageService : public ServiceThread + { + public: + PullMessageService(MQClientFactory* pMQClientFactory); + ~PullMessageService(); + + void executePullRequestLater(PullRequest* pPullRequest, long timeDelay); + void executeTaskLater(kpr::TimerHandler* pHandler, long timeDelay); + + void executePullRequestImmediately(PullRequest* pPullRequest); + std::string getServiceName(); + + void Run(); + private: + void pullMessage(PullRequest* pPullRequest); + + private: + std::list<PullRequest*> m_pullRequestQueue; + kpr::Mutex m_lock; + MQClientFactory* m_pMQClientFactory; + kpr::TimerThreadPtr m_TimerThread; + }; + typedef kpr::RefHandleT<PullMessageService> PullMessageServicePtr; +} + +#endif
