http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp new file mode 100755 index 0000000..c7d9695 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp @@ -0,0 +1,574 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <list> +#include <string> + +#include "ConsumeMessageOrderlyService.h" +#include "DefaultMQPushConsumerImpl.h" +#include "MQClientFactory.h" +#include "DefaultMQProducer.h" +#include "MessageListener.h" +#include "MessageQueue.h" +#include "RebalanceImpl.h" +#include "DefaultMQPushConsumer.h" +#include "OffsetStore.h" +#include "ScopedLock.h" +#include "KPRUtil.h" +#include "MixAll.h" +#include "UtilAll.h" + +namespace rmq +{ + +class LockMq : public kpr::TimerHandler +{ +public: + LockMq(ConsumeMessageOrderlyService* pService) + : m_pService(pService) + { + + } + + void OnTimeOut(unsigned int timerID) + { + m_pService->lockMQPeriodically(); + + // can not delete + //delete this; + } + +private: + ConsumeMessageOrderlyService* m_pService; +}; + +class SubmitConsumeRequestLaterOrderly : public kpr::TimerHandler +{ +public: + SubmitConsumeRequestLaterOrderly(ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + ConsumeMessageOrderlyService* pService) + : m_pProcessQueue(pProcessQueue), + m_messageQueue(messageQueue), + m_pService(pService) + { + + } + + void OnTimeOut(unsigned int timerID) + { + try + { + std::list<MessageExt*> msgs; + m_pService->submitConsumeRequest(msgs, m_pProcessQueue, m_messageQueue, true); + } + catch(...) + { + RMQ_ERROR("SubmitConsumeRequestLaterOrderly OnTimeOut exception"); + } + + delete this; + } + +private: + ProcessQueue* m_pProcessQueue; + MessageQueue m_messageQueue; + ConsumeMessageOrderlyService* m_pService; +}; + + +class TryLockLaterAndReconsume : public kpr::TimerHandler +{ +public: + TryLockLaterAndReconsume(ProcessQueue* pProcessQueue, + MessageQueue& messageQueue, + ConsumeMessageOrderlyService* pService) + : m_pProcessQueue(pProcessQueue), + m_messageQueue(messageQueue), + m_pService(pService) + { + + } + + void OnTimeOut(unsigned int timerID) + { + try + { + bool lockOK = m_pService->lockOneMQ(m_messageQueue); + if (lockOK) + { + m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10); + } + else + { + m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 3000); + } + } + catch(...) + { + RMQ_ERROR("TryLockLaterAndReconsume OnTimeOut exception"); + } + + delete this; + } + +private: + ProcessQueue* m_pProcessQueue; + MessageQueue m_messageQueue; + ConsumeMessageOrderlyService* m_pService; +}; + + + +ConsumeMessageOrderlyService::ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl, + MessageListenerOrderly* pMessageListener) +{ + m_stoped = false; + m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl; + m_pMessageListener = pMessageListener; + m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer(); + m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup(); + m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 1, + m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax()); + m_scheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 10); +} + +ConsumeMessageOrderlyService::~ConsumeMessageOrderlyService() +{ +} + + +void ConsumeMessageOrderlyService::start() +{ + m_scheduledExecutorService->Start(); + + LockMq* lm = new LockMq(this); + m_scheduledExecutorService->RegisterTimer(0, ProcessQueue::s_RebalanceLockInterval, lm, true); +} + +void ConsumeMessageOrderlyService::shutdown() +{ + m_stoped = true; + m_pConsumeExecutor->Destroy(); + m_scheduledExecutorService->Stop(); + m_scheduledExecutorService->Join(); + unlockAllMQ(); +} + +void ConsumeMessageOrderlyService::unlockAllMQ() +{ + m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->unlockAll(false); +} + +void ConsumeMessageOrderlyService::lockMQPeriodically() +{ + if (!m_stoped) + { + m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lockAll(); + } +} + +bool ConsumeMessageOrderlyService::lockOneMQ(MessageQueue& mq) +{ + if (!m_stoped) + { + return m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lock(mq); + } + + return false; +} + +void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(MessageQueue& messageQueue, + ProcessQueue* pProcessQueue, + long long delayMills) +{ + TryLockLaterAndReconsume* consume = new TryLockLaterAndReconsume(pProcessQueue, messageQueue, this); + m_scheduledExecutorService->RegisterTimer(0, int(delayMills), consume, false); +} + +ConsumerStat& ConsumeMessageOrderlyService::getConsumerStat() +{ + return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat(); +} + +void ConsumeMessageOrderlyService::submitConsumeRequestLater(ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + long long suspendTimeMillis) +{ + long timeMillis = long(suspendTimeMillis); + if (timeMillis < 10) + { + timeMillis = 10; + } + else if (timeMillis > 30000) + { + timeMillis = 30000; + } + + SubmitConsumeRequestLaterOrderly* sc = new SubmitConsumeRequestLaterOrderly(pProcessQueue, messageQueue, this); + m_scheduledExecutorService->RegisterTimer(0, timeMillis, sc, false); +} + +void ConsumeMessageOrderlyService::submitConsumeRequest(std::list<MessageExt*>& msgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + bool dispathToConsume) +{ + if (dispathToConsume) + { + kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeOrderlyRequest(pProcessQueue, messageQueue, this); + m_pConsumeExecutor->AddWork(consumeRequest); + } +} + +void ConsumeMessageOrderlyService::updateCorePoolSize(int corePoolSize) +{ +} + + +std::string& ConsumeMessageOrderlyService::getConsumerGroup() +{ + return m_consumerGroup; +} + +MessageListenerOrderly* ConsumeMessageOrderlyService::getMessageListener() +{ + return m_pMessageListener; +} + +DefaultMQPushConsumerImpl* ConsumeMessageOrderlyService::getDefaultMQPushConsumerImpl() +{ + return m_pDefaultMQPushConsumerImpl; +} + +bool ConsumeMessageOrderlyService::processConsumeResult(std::list<MessageExt*>& msgs, + ConsumeOrderlyStatus status, + ConsumeOrderlyContext& context, + ConsumeOrderlyRequest& consumeRequest) +{ + bool continueConsume = true; + long long commitOffset = -1L; + int msgsSize = msgs.size(); + + if (context.autoCommit) + { + switch (status) + { + case COMMIT: + case ROLLBACK: + RMQ_WARN("the message queue consume result is illegal, we think you want to ack these message: %s", + consumeRequest.getMessageQueue().toString().c_str()); + case SUCCESS: + getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize); + commitOffset = consumeRequest.getProcessQueue()->commit(); + break; + case SUSPEND_CURRENT_QUEUE_A_MOMENT: + getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize); + if (checkReconsumeTimes(msgs)) + { + consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs); + submitConsumeRequestLater(consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), + context.suspendCurrentQueueTimeMillis); + continueConsume = false; + } + else + { + commitOffset = consumeRequest.getProcessQueue()->commit(); + } + + break; + default: + break; + } + } + else + { + switch (status) + { + case SUCCESS: + getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize); + break; + case COMMIT: + commitOffset = consumeRequest.getProcessQueue()->commit(); + break; + case ROLLBACK: + consumeRequest.getProcessQueue()->rollback(); + submitConsumeRequestLater(consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), + context.suspendCurrentQueueTimeMillis); + continueConsume = false; + break; + case SUSPEND_CURRENT_QUEUE_A_MOMENT: + getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize); + if (checkReconsumeTimes(msgs)) + { + consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs); + submitConsumeRequestLater(consumeRequest.getProcessQueue(), + consumeRequest.getMessageQueue(), + context.suspendCurrentQueueTimeMillis); + continueConsume = false; + } + break; + default: + break; + } + } + + if (commitOffset >= 0 && !consumeRequest.getProcessQueue()->isDropped()) + { + m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(), + commitOffset, false); + } + + return continueConsume; +} + +bool ConsumeMessageOrderlyService::checkReconsumeTimes(std::list<MessageExt*>& msgs) +{ + bool suspend = false; + + if (!msgs.empty()) + { + std::list<MessageExt*>::iterator it = msgs.begin(); + for (; it != msgs.end(); it++) + { + MessageExt* msg = *it; + if (msg->getReconsumeTimes() >= m_pDefaultMQPushConsumer->getMaxReconsumeTimes()) + { + msg->putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(msg->getReconsumeTimes())); + + if (!sendMessageBack(*msg)) + { + suspend = true; + msg->setReconsumeTimes(msg->getReconsumeTimes() + 1); + } + } + else + { + suspend = true; + msg->setReconsumeTimes(msg->getReconsumeTimes() + 1); + } + } + } + + return suspend; +} + +bool ConsumeMessageOrderlyService::sendMessageBack(MessageExt& msg) +{ + try + { + Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()), + msg.getBody(), msg.getBodyLen()); + + std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID); + newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId() + : originMsgId); + + newMsg.setFlag(msg.getFlag()); + newMsg.setProperties(msg.getProperties()); + newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic()); + + int reTimes = msg.getReconsumeTimes() + 1; + newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes)); + newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + reTimes); + + m_pDefaultMQPushConsumerImpl->getmQClientFactory()->getDefaultMQProducer()->send(newMsg); + + return true; + } + catch (...) + { + RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s", + m_consumerGroup.c_str(), msg.toString().c_str()); + } + + return false; +} + + +MessageQueueLock& ConsumeMessageOrderlyService::getMessageQueueLock() +{ + return m_messageQueueLock; +} + +DefaultMQPushConsumer* ConsumeMessageOrderlyService::getDefaultMQPushConsumer() +{ + return m_pDefaultMQPushConsumer; +} + +ConsumeOrderlyRequest::ConsumeOrderlyRequest(ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + ConsumeMessageOrderlyService* pService) +{ + m_pProcessQueue = pProcessQueue; + m_messageQueue = messageQueue; + m_pService = pService; +} + +ConsumeOrderlyRequest::~ConsumeOrderlyRequest() +{ +} + +void ConsumeOrderlyRequest::Do() +{ + if (m_pProcessQueue->isDropped()) + { + RMQ_WARN("run, the message queue not be able to consume, because it's dropped, MQ: %s", + m_messageQueue.toString().c_str()); + return; + } + + try + { + kpr::Mutex* objLock = m_pService->getMessageQueueLock().fetchLockObject(m_messageQueue); + { + kpr::ScopedLock<kpr::Mutex> lock(*objLock); + + MessageModel messageModel = m_pService->getDefaultMQPushConsumerImpl()->messageModel(); + if (BROADCASTING == messageModel + || (m_pProcessQueue->isLocked() || !m_pProcessQueue->isLockExpired())) + { + long long beginTime = KPRUtil::GetCurrentTimeMillis(); + for (bool continueConsume = true; continueConsume;) + { + if (m_pProcessQueue->isDropped()) + { + RMQ_INFO("the message queue not be able to consume, because it's droped, MQ: %s", + m_messageQueue.toString().c_str()); + break; + } + + if (CLUSTERING == messageModel + && !m_pProcessQueue->isLocked()) + { + RMQ_WARN("the message queue not locked, so consume later, MQ: %s", m_messageQueue.toString().c_str()); + m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10); + break; + } + + if (CLUSTERING == messageModel + && m_pProcessQueue->isLockExpired()) + { + RMQ_WARN("the message queue lock expired, so consume later, MQ: %s", m_messageQueue.toString().c_str()); + m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10); + break; + } + + long interval = long(KPRUtil::GetCurrentTimeMillis() - beginTime); + if (interval > ConsumeMessageOrderlyService::s_MaxTimeConsumeContinuously) + { + m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10); + break; + } + + int consumeBatchSize = + m_pService->getDefaultMQPushConsumer()->getConsumeMessageBatchMaxSize(); + + std::list<MessageExt*> msgs = m_pProcessQueue->takeMessages(consumeBatchSize); + if (!msgs.empty()) + { + ConsumeOrderlyContext context(m_messageQueue); + + ConsumeOrderlyStatus status = SUSPEND_CURRENT_QUEUE_A_MOMENT; + + ConsumeMessageContext consumeMessageContext; + if (m_pService->getDefaultMQPushConsumerImpl()->hasHook()) + { + consumeMessageContext.consumerGroup = m_pService->getConsumerGroup(); + consumeMessageContext.mq = m_messageQueue; + consumeMessageContext.msgList = msgs; + consumeMessageContext.success = false; + m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext); + } + + long long beginTimestamp = KPRUtil::GetCurrentTimeMillis(); + try + { + kpr::ScopedLock<kpr::Mutex> lock(m_pProcessQueue->getLockConsume()); + if (m_pProcessQueue->isDropped()) + { + RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s", + m_messageQueue.toString().c_str()); + break; + } + + status = m_pService->getMessageListener()->consumeMessage(msgs, context); + } + catch (...) + { + RMQ_WARN("consumeMessage exception, Group: {%s}, Msgs: {%u}, MQ: %s",// + m_pService->getConsumerGroup().c_str(), + (unsigned)msgs.size(), + m_messageQueue.toString().c_str()); + } + + long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp; + + if (SUSPEND_CURRENT_QUEUE_A_MOMENT == status + || ROLLBACK == status) + { + RMQ_WARN("consumeMessage Orderly return not OK, Group: {%s} Msgs: {%u} MQ: %s",// + m_pService->getConsumerGroup().c_str(), + (unsigned)msgs.size(), + m_messageQueue.toString().c_str()); + //status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; + } + + if (m_pService->getDefaultMQPushConsumerImpl()->hasHook()) + { + consumeMessageContext.success = (SUCCESS == status + || COMMIT == status); + m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext); + } + + m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT); + MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat() + .consumeMsgRTMax, consumeRT); + + continueConsume = m_pService->processConsumeResult(msgs, status, context, *this); + } + else + { + continueConsume = false; + } + } + } + else + { + if (m_pProcessQueue->isDropped()) + { + RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s", + m_messageQueue.toString().c_str()); + return; + } + + m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 100); + } + } + } + catch(...) + { + RMQ_WARN("ConsumeOrderlyRequest exception"); + } + + return; +} + +} +
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h new file mode 100755 index 0000000..0f8628b --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h @@ -0,0 +1,122 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __CONSUMEMESSAGEORDERLYSERVICE_H__ +#define __CONSUMEMESSAGEORDERLYSERVICE_H__ + +#include "ConsumeMessageService.h" + +#include <list> +#include <string> +#include "RocketMQClient.h" +#include "ConsumerStatManage.h" +#include "MessageQueueLock.h" +#include "MessageListener.h" +#include "ThreadPool.h" +#include "TimerThread.h" + +namespace rmq +{ +class DefaultMQPushConsumerImpl; +class MessageListenerOrderly; +class DefaultMQPushConsumer; +class ConsumeMessageOrderlyService; + +class ConsumeOrderlyRequest: public kpr::ThreadPoolWork +{ +public: + ConsumeOrderlyRequest(ProcessQueue *pProcessQueue, + const MessageQueue &messageQueue, + ConsumeMessageOrderlyService *pService); + ~ConsumeOrderlyRequest(); + + virtual void Do(); + + ProcessQueue *getProcessQueue() + { + return m_pProcessQueue; + } + + MessageQueue &getMessageQueue() + { + return m_messageQueue; + } + +private: + ProcessQueue *m_pProcessQueue; + MessageQueue m_messageQueue; + ConsumeMessageOrderlyService *m_pService; +}; + + +class ConsumeMessageOrderlyService : public ConsumeMessageService +{ +public: + static const long s_MaxTimeConsumeContinuously = 60000; + +public: + ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl + *pDefaultMQPushConsumerImpl, + MessageListenerOrderly *pMessageListener); + ~ConsumeMessageOrderlyService(); + + void start(); + void shutdown(); + + void unlockAllMQ(); + void lockMQPeriodically(); + bool lockOneMQ(MessageQueue &mq); + void tryLockLaterAndReconsume(MessageQueue &messageQueue, + ProcessQueue *pProcessQueue, + long long delayMills); + bool processConsumeResult(std::list<MessageExt *> &msgs, + ConsumeOrderlyStatus status, + ConsumeOrderlyContext &context, + ConsumeOrderlyRequest &consumeRequest); + bool checkReconsumeTimes(std::list<MessageExt *> &msgs); + bool sendMessageBack(MessageExt &msg); + ConsumerStat& getConsumerStat(); + + void submitConsumeRequestLater(ProcessQueue *pProcessQueue, + const MessageQueue &messageQueue, + long long suspendTimeMillis); + + void submitConsumeRequest(std::list<MessageExt *> &msgs, + ProcessQueue *pProcessQueue, + const MessageQueue &messageQueue, + bool dispathToConsume); + + void updateCorePoolSize(int corePoolSize); + MessageQueueLock &getMessageQueueLock(); + std::string &getConsumerGroup(); + MessageListenerOrderly *getMessageListener(); + DefaultMQPushConsumerImpl *getDefaultMQPushConsumerImpl(); + DefaultMQPushConsumer *getDefaultMQPushConsumer(); + +private: + volatile bool m_stoped; + DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl; + DefaultMQPushConsumer *m_pDefaultMQPushConsumer; + MessageListenerOrderly *m_pMessageListener; + std::string m_consumerGroup; + MessageQueueLock m_messageQueueLock; + + kpr::ThreadPoolPtr m_pConsumeExecutor; + kpr::TimerThreadPtr m_scheduledExecutorService; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h new file mode 100755 index 0000000..57a9bee --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h @@ -0,0 +1,41 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __CONSUMEMESSAGESERVICE_H__ +#define __CONSUMEMESSAGESERVICE_H__ + +#include <list> + +namespace rmq +{ + class MessageExt; + class ProcessQueue; + class MessageQueue; + + class ConsumeMessageService + { + public: + virtual ~ConsumeMessageService() {} + virtual void start() = 0; + virtual void shutdown() = 0; + virtual void updateCorePoolSize(int corePoolSize) = 0; + virtual void submitConsumeRequest(std::list<MessageExt*>& msgs, + ProcessQueue* pProcessQueue, + const MessageQueue& messageQueue, + bool dispathToConsume) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeType.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumeType.cpp b/rocketmq-client4cpp/src/consumer/ConsumeType.cpp new file mode 100755 index 0000000..6ef5837 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumeType.cpp @@ -0,0 +1,70 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "ConsumeType.h" + +namespace rmq +{ + +const char* getConsumeTypeString(ConsumeType type) +{ + switch (type) + { + case CONSUME_ACTIVELY: + return "CONSUME_ACTIVELY"; + case CONSUME_PASSIVELY: + return "CONSUME_PASSIVELY"; + } + + return "UnknowConsumeType"; +} + +const char* getConsumeFromWhereString(ConsumeFromWhere type) +{ + switch (type) + { + case CONSUME_FROM_LAST_OFFSET: + return "CONSUME_FROM_LAST_OFFSET"; + case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: + return "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST"; + case CONSUME_FROM_MAX_OFFSET: + return "CONSUME_FROM_MAX_OFFSET"; + case CONSUME_FROM_MIN_OFFSET: + return "CONSUME_FROM_MIN_OFFSET"; + case CONSUME_FROM_FIRST_OFFSET: + return "CONSUME_FROM_FIRST_OFFSET"; + case CONSUME_FROM_TIMESTAMP: + return "CONSUME_FROM_TIMESTAMP"; + } + + return "UnknowConsumeFromWhere"; +} + +const char* getMessageModelString(MessageModel type) +{ + switch (type) + { + case CLUSTERING: + return "CLUSTERING"; + case BROADCASTING: + return "BROADCASTING"; + } + + return "UnknowMessageModel"; +} + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp new file mode 100755 index 0000000..c9dc304 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp @@ -0,0 +1,96 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "ConsumerInvokeCallback.h" +#include "ResponseFuture.h" +#include "PullResult.h" +#include "MQClientAPIImpl.h" +#include "PullCallback.h" +#include "MQClientException.h" +#include "RemotingCommand.h" + +namespace rmq +{ + +ConsumerInvokeCallback::ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl) + : m_pPullCallback(pPullCallback), + m_pMQClientAPIImpl(pMQClientAPIImpl) +{ +} + +ConsumerInvokeCallback::~ConsumerInvokeCallback() +{ + if (m_pPullCallback != NULL) + { + delete m_pPullCallback; + m_pPullCallback = NULL; + } +} + +void ConsumerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture) +{ + if (m_pPullCallback == NULL) + { + delete this; + return; + } + + RemotingCommand* response = pResponseFuture->getResponseCommand(); + if (response != NULL) + { + try + { + PullResult* pullResult = m_pMQClientAPIImpl->processPullResponse(response); + response->setBody(NULL, 0, false); + + m_pPullCallback->onSuccess(*pullResult); + + pullResult->msgFoundList.clear(); + delete pullResult; + } + catch (MQException& e) + { + m_pPullCallback->onException(e); + } + + delete response; + } + else + { + if (!pResponseFuture->isSendRequestOK()) + { + std::string msg = "send request failed"; + MQClientException e(msg, -1, __FILE__, __LINE__); + m_pPullCallback->onException(e); + } + else if (pResponseFuture->isTimeout()) + { + std::string msg = "wait response timeout"; + MQClientException e(msg, -1, __FILE__, __LINE__); + m_pPullCallback->onException(e); + } + else + { + std::string msg = "unknow reseaon"; + MQClientException e(msg, -1, __FILE__, __LINE__); + m_pPullCallback->onException(e); + } + } + + delete this; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h new file mode 100755 index 0000000..675f2fd --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h @@ -0,0 +1,40 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __CONSUMER_INVOKECALLBACK_H__ +#define __CONSUMER_INVOKECALLBACK_H__ + +#include "InvokeCallback.h" + +namespace rmq +{ + class PullCallback; + class MQClientAPIImpl; + + class ConsumerInvokeCallback : public InvokeCallback + { + public: + ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl); + virtual ~ConsumerInvokeCallback(); + virtual void operationComplete(ResponseFuturePtr pResponseFuture); + + private: + PullCallback* m_pPullCallback; + MQClientAPIImpl* m_pMQClientAPIImpl; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h b/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h new file mode 100755 index 0000000..92cf74c --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h @@ -0,0 +1,132 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __CONSUMERSTAT_H__ +#define __CONSUMERSTAT_H__ + +#include <list> +#include <string> + +#include "AtomicValue.h" +#include "KPRUtil.h" +#include "Mutex.h" +#include "ScopedLock.h" + +namespace rmq +{ + struct ConsumerStat + { + long long createTimestamp; + kpr::AtomicLong consumeMsgRTMax; + kpr::AtomicLong consumeMsgRTTotal; + kpr::AtomicLong consumeMsgOKTotal; + kpr::AtomicLong consumeMsgFailedTotal; + kpr::AtomicLong pullRTTotal; + kpr::AtomicLong pullTimesTotal; + + ConsumerStat() + { + createTimestamp = KPRUtil::GetCurrentTimeMillis(); + consumeMsgRTMax = 0; + consumeMsgRTTotal = 0; + consumeMsgOKTotal = 0; + consumeMsgFailedTotal = 0; + pullRTTotal = 0; + pullTimesTotal = 0; + } + }; + + + class ConsumerStatManager + { + public: + ConsumerStat& getConsumertat() + { + return m_consumertat; + } + + std::list<ConsumerStat>& getSnapshotList() + { + return m_snapshotList; + } + + /** + * every 1s + */ + void recordSnapshotPeriodically() + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_snapshotListLock); + m_snapshotList.push_back(m_consumertat); + if (m_snapshotList.size() > 60) + { + m_snapshotList.pop_front(); + } + } + + /** + * every 1m + */ + void logStatsPeriodically(std::string& group, std::string& clientId) + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_snapshotListLock); + if (m_snapshotList.size() >= 60) + { + ConsumerStat& first = m_snapshotList.front(); + ConsumerStat& last = m_snapshotList.back(); + + { + double avgRT = (last.consumeMsgRTTotal.get() - first.consumeMsgRTTotal.get()) + / + (double)((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get()) + - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get())); + + double tps = ((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get()) + - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get())) + / (double)(last.createTimestamp - first.createTimestamp); + + tps *= 1000; + + RMQ_INFO( + "Consumer, {%s} {%s}, ConsumeAvgRT: {%f} ConsumeMaxRT: {%lld} TotalOKMsg: {%lld} TotalFailedMsg: {%lld} consumeTPS: {%f}", + group.c_str(), + clientId.c_str(), + avgRT, + last.consumeMsgRTMax.get(), + last.consumeMsgOKTotal.get(), + last.consumeMsgFailedTotal.get(), + tps); + } + + { + double avgRT = (last.pullRTTotal.get() - first.pullRTTotal.get()) + / (double)(last.pullTimesTotal.get() - first.pullTimesTotal.get()); + + RMQ_INFO("Consumer, {%s} {%s}, PullAvgRT: {%f} PullTimesTotal: {%lld}", + group.c_str(), + clientId.c_str(), + avgRT, + last.pullTimesTotal.get()); + } + } + } + + private: + ConsumerStat m_consumertat; + std::list<ConsumerStat> m_snapshotList; + kpr::RWMutex m_snapshotListLock; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp new file mode 100755 index 0000000..67a8c8c --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp @@ -0,0 +1,309 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "DefaultMQPullConsumer.h" + +#include <list> +#include <string> + +#include "MessageQueue.h" +#include "MessageExt.h" +#include "ClientConfig.h" +#include "DefaultMQPullConsumerImpl.h" +#include "MixAll.h" +#include "AllocateMessageQueueStrategyInner.h" + +namespace rmq +{ + +DefaultMQPullConsumer::DefaultMQPullConsumer() + : m_consumerGroup(MixAll::DEFAULT_CONSUMER_GROUP), + m_brokerSuspendMaxTimeMillis(1000 * 20), + m_consumerTimeoutMillisWhenSuspend(1000 * 30), + m_consumerPullTimeoutMillis(1000 * 10), + m_messageModel(CLUSTERING), + m_pMessageQueueListener(NULL), + m_pOffsetStore(NULL), + m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()), + m_unitMode(false), + m_maxReconsumeTimes(16) +{ + m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this); +} + +DefaultMQPullConsumer::DefaultMQPullConsumer(const std::string& consumerGroup) + : m_consumerGroup(consumerGroup), + m_brokerSuspendMaxTimeMillis(1000 * 20), + m_consumerTimeoutMillisWhenSuspend(1000 * 30), + m_consumerPullTimeoutMillis(1000 * 10), + m_messageModel(CLUSTERING), + m_pMessageQueueListener(NULL), + m_pOffsetStore(NULL), + m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()), + m_unitMode(false), + m_maxReconsumeTimes(16) +{ + m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this); +} + +DefaultMQPullConsumer::~DefaultMQPullConsumer() +{ + //memleak or coredump + if (m_pAllocateMessageQueueStrategy) + delete m_pAllocateMessageQueueStrategy; + if (m_pDefaultMQPullConsumerImpl) + delete m_pDefaultMQPullConsumerImpl; +} + +//MQAdmin +void DefaultMQPullConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum) +{ + m_pDefaultMQPullConsumerImpl->createTopic(key, newTopic, queueNum); +} + +long long DefaultMQPullConsumer::searchOffset(const MessageQueue& mq, long long timestamp) +{ + return m_pDefaultMQPullConsumerImpl->searchOffset(mq, timestamp); +} + +long long DefaultMQPullConsumer::maxOffset(const MessageQueue& mq) +{ + return m_pDefaultMQPullConsumerImpl->maxOffset(mq); +} + +long long DefaultMQPullConsumer::minOffset(const MessageQueue& mq) +{ + return m_pDefaultMQPullConsumerImpl->minOffset(mq); +} + +long long DefaultMQPullConsumer::earliestMsgStoreTime(const MessageQueue& mq) +{ + return m_pDefaultMQPullConsumerImpl->earliestMsgStoreTime(mq); +} + +MessageExt* DefaultMQPullConsumer::viewMessage(const std::string& msgId) +{ + return m_pDefaultMQPullConsumerImpl->viewMessage(msgId); +} + +QueryResult DefaultMQPullConsumer::queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end) +{ + return m_pDefaultMQPullConsumerImpl->queryMessage(topic, key, maxNum, begin, end); +} +// MQadmin end + +AllocateMessageQueueStrategy* DefaultMQPullConsumer::getAllocateMessageQueueStrategy() +{ + return m_pAllocateMessageQueueStrategy; +} + +void DefaultMQPullConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy) +{ + m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy; +} + +int DefaultMQPullConsumer::getBrokerSuspendMaxTimeMillis() +{ + return m_brokerSuspendMaxTimeMillis; +} + +void DefaultMQPullConsumer::setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis) +{ + m_brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; +} + +std::string DefaultMQPullConsumer::getConsumerGroup() +{ + return m_consumerGroup; +} + +void DefaultMQPullConsumer::setConsumerGroup(const std::string& consumerGroup) +{ + m_consumerGroup = consumerGroup; +} + +int DefaultMQPullConsumer::getConsumerPullTimeoutMillis() +{ + return m_consumerPullTimeoutMillis; +} + +void DefaultMQPullConsumer::setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis) +{ + m_consumerPullTimeoutMillis = consumerPullTimeoutMillis; +} + +int DefaultMQPullConsumer::getConsumerTimeoutMillisWhenSuspend() +{ + return m_consumerTimeoutMillisWhenSuspend; +} + +void DefaultMQPullConsumer::setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend) +{ + m_consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend; +} + +MessageModel DefaultMQPullConsumer::getMessageModel() +{ + return m_messageModel; +} + +void DefaultMQPullConsumer::setMessageModel(MessageModel messageModel) +{ + m_messageModel = messageModel; +} + +MessageQueueListener* DefaultMQPullConsumer::getMessageQueueListener() +{ + return m_pMessageQueueListener; +} + +void DefaultMQPullConsumer::setMessageQueueListener(MessageQueueListener* pMessageQueueListener) +{ + m_pMessageQueueListener = pMessageQueueListener; +} + +std::set<std::string> DefaultMQPullConsumer::getRegisterTopics() +{ + return m_registerTopics; +} + +void DefaultMQPullConsumer::setRegisterTopics(std::set<std::string> registerTopics) +{ + m_registerTopics = registerTopics; +} + +//MQConsumer +void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel) +{ + m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, ""); +} + +void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName) +{ + m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, brokerName); +} + + + +std::set<MessageQueue>* DefaultMQPullConsumer::fetchSubscribeMessageQueues(const std::string& topic) +{ + return m_pDefaultMQPullConsumerImpl->fetchSubscribeMessageQueues(topic); +} + +void DefaultMQPullConsumer::start() +{ + m_pDefaultMQPullConsumerImpl->start(); +} + +void DefaultMQPullConsumer::shutdown() +{ + m_pDefaultMQPullConsumerImpl->shutdown(); +} +//MQConsumer end + +//MQPullConsumer +void DefaultMQPullConsumer::registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener) +{ + m_registerTopics.insert(topic); + + if (pListener) + { + m_pMessageQueueListener = pListener; + } +} + +PullResult* DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums) +{ + return m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums); +} + +void DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback) +{ + m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums, pPullCallback); +} + +PullResult* DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums) +{ + return m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums); +} + +void DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums, + PullCallback* pPullCallback) +{ + m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums, pPullCallback); +} + +void DefaultMQPullConsumer::updateConsumeOffset(MessageQueue& mq, long long offset) +{ + m_pDefaultMQPullConsumerImpl->updateConsumeOffset(mq, offset); +} + +long long DefaultMQPullConsumer::fetchConsumeOffset(MessageQueue& mq, bool fromStore) +{ + return m_pDefaultMQPullConsumerImpl->fetchConsumeOffset(mq, fromStore); +} + +std::set<MessageQueue>* DefaultMQPullConsumer::fetchMessageQueuesInBalance(const std::string& topic) +{ + return m_pDefaultMQPullConsumerImpl->fetchMessageQueuesInBalance(topic); +} +//MQPullConsumer end + +OffsetStore* DefaultMQPullConsumer::getOffsetStore() +{ + return m_pOffsetStore; +} + +void DefaultMQPullConsumer::setOffsetStore(OffsetStore* offsetStore) +{ + m_pOffsetStore = offsetStore; +} + +DefaultMQPullConsumerImpl* DefaultMQPullConsumer::getDefaultMQPullConsumerImpl() +{ + return m_pDefaultMQPullConsumerImpl; +} + +bool DefaultMQPullConsumer::isUnitMode() +{ + return m_unitMode; +} + +void DefaultMQPullConsumer::setUnitMode(bool isUnitMode) +{ + m_unitMode = isUnitMode; +} + +int DefaultMQPullConsumer::getMaxReconsumeTimes() +{ + return m_maxReconsumeTimes; +} + +void DefaultMQPullConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) +{ + m_maxReconsumeTimes = maxReconsumeTimes; +} + + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp new file mode 100755 index 0000000..d6465e9 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp @@ -0,0 +1,630 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "DefaultMQPullConsumerImpl.h" + +#include <iostream> +#include <string> +#include <set> +#include "DefaultMQPullConsumer.h" +#include "DefaultMQProducer.h" +#include "MQClientFactory.h" +#include "MQAdminImpl.h" +#include "RebalancePullImpl.h" +#include "MQClientAPIImpl.h" +#include "OffsetStore.h" +#include "MixAll.h" +#include "MQClientManager.h" +#include "LocalFileOffsetStore.h" +#include "RemoteBrokerOffsetStore.h" +#include "PullSysFlag.h" +#include "FilterAPI.h" +#include "PullAPIWrapper.h" +#include "MQClientException.h" +#include "Validators.h" +#include "ScopedLock.h" + +namespace rmq +{ + +DefaultMQPullConsumerImpl::DefaultMQPullConsumerImpl(DefaultMQPullConsumer* pDefaultMQPullConsumer) + : m_pDefaultMQPullConsumer(pDefaultMQPullConsumer), + m_serviceState(CREATE_JUST) +{ + m_pMQClientFactory = NULL; + m_pPullAPIWrapper = NULL; + m_pOffsetStore = NULL; + m_pRebalanceImpl = new RebalancePullImpl(this); +} + +DefaultMQPullConsumerImpl::~DefaultMQPullConsumerImpl() +{ + if (m_pRebalanceImpl) + delete m_pRebalanceImpl; + if (m_pPullAPIWrapper) + delete m_pPullAPIWrapper; + if (m_pOffsetStore) + delete m_pOffsetStore; + //delete m_pMQClientFactory; +} + +void DefaultMQPullConsumerImpl::start() +{ + RMQ_INFO("DefaultMQPullConsumerImpl::start()"); + switch (m_serviceState) + { + case CREATE_JUST: + { + RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}", + m_pDefaultMQPullConsumer->getConsumerGroup().c_str(), + getMessageModelString(m_pDefaultMQPullConsumer->getMessageModel())); + + m_serviceState = START_FAILED; + checkConfig(); + copySubscription(); + + if (m_pDefaultMQPullConsumer->getMessageModel() == CLUSTERING) + { + m_pDefaultMQPullConsumer->changeInstanceNameToPID(); + } + + m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPullConsumer); + + m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPullConsumer->getConsumerGroup()); + m_pRebalanceImpl->setMessageModel(m_pDefaultMQPullConsumer->getMessageModel()); + m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy()); + m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory); + + m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup()); + + if (m_pDefaultMQPullConsumer->getOffsetStore() != NULL) + { + m_pOffsetStore = m_pDefaultMQPullConsumer->getOffsetStore(); + } + else + { + switch (m_pDefaultMQPullConsumer->getMessageModel()) + { + case BROADCASTING: + m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup()); + break; + case CLUSTERING: + m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup()); + break; + default: + break; + } + } + + m_pOffsetStore->load(); + + bool registerOK = + m_pMQClientFactory->registerConsumer(m_pDefaultMQPullConsumer->getConsumerGroup(), this); + if (!registerOK) + { + m_serviceState = CREATE_JUST; + std::string str = "The consumer group[" + m_pDefaultMQPullConsumer->getConsumerGroup(); + str += "] has been created before, specify another name please."; + THROW_MQEXCEPTION(MQClientException, str, -1); + } + + m_pMQClientFactory->start(); + + m_serviceState = RUNNING; + } + break; + case RUNNING: + case START_FAILED: + case SHUTDOWN_ALREADY: + THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1); + default: + break; + } +} + + +void DefaultMQPullConsumerImpl::shutdown() +{ + RMQ_DEBUG("DefaultMQPullConsumerImpl::shutdown()"); + switch (m_serviceState) + { + case CREATE_JUST: + break; + case RUNNING: + persistConsumerOffset(); + m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPullConsumer->getConsumerGroup()); + m_pMQClientFactory->shutdown(); + + m_serviceState = SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } +} + + +void DefaultMQPullConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum) +{ + makeSureStateOK(); + m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum); +} + +long long DefaultMQPullConsumerImpl::fetchConsumeOffset(MessageQueue& mq, bool fromStore) +{ + makeSureStateOK(); + return m_pOffsetStore->readOffset(mq, fromStore ? READ_FROM_STORE : MEMORY_FIRST_THEN_STORE); +} + +std::set<MessageQueue>* DefaultMQPullConsumerImpl::fetchMessageQueuesInBalance(const std::string& topic) +{ + makeSureStateOK(); + std::set<MessageQueue>* mqResult = new std::set<MessageQueue>; + + kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock()); + std::map<MessageQueue, ProcessQueue*>& mqTable = m_pRebalanceImpl->getProcessQueueTable(); + RMQ_FOR_EACH(mqTable, it) + { + if (it->first.getTopic() == topic) + { + mqResult->insert(it->first); + } + } + + return mqResult; +} + +std::vector<MessageQueue>* DefaultMQPullConsumerImpl::fetchPublishMessageQueues(const std::string& topic) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic); +} + +std::set<MessageQueue>* DefaultMQPullConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->fetchSubscribeMessageQueues(topic); +} + +long long DefaultMQPullConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq); +} + +std::string DefaultMQPullConsumerImpl::groupName() +{ + return m_pDefaultMQPullConsumer->getConsumerGroup(); +} + +MessageModel DefaultMQPullConsumerImpl::messageModel() +{ + return m_pDefaultMQPullConsumer->getMessageModel(); +} + +ConsumeType DefaultMQPullConsumerImpl::consumeType() +{ + return CONSUME_ACTIVELY; +} + +ConsumeFromWhere DefaultMQPullConsumerImpl::consumeFromWhere() +{ + return CONSUME_FROM_LAST_OFFSET; +} + +std::set<SubscriptionData> DefaultMQPullConsumerImpl::subscriptions() +{ + //TODO + std::set<SubscriptionData> result; + return result; +} + +void DefaultMQPullConsumerImpl::doRebalance() +{ + if (m_pRebalanceImpl != NULL) + { + m_pRebalanceImpl->doRebalance(); + } +} + +void DefaultMQPullConsumerImpl::persistConsumerOffset() +{ + try + { + makeSureStateOK(); + + std::set<MessageQueue> mqs; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock()); + std::map<MessageQueue, ProcessQueue*> processQueueTable = m_pRebalanceImpl->getProcessQueueTable(); + RMQ_FOR_EACH(processQueueTable, it) + { + mqs.insert(it->first); + } + } + + m_pOffsetStore->persistAll(mqs); + } + catch (...) + { + RMQ_ERROR("group {%s} persistConsumerOffset exception", + m_pDefaultMQPullConsumer->getConsumerGroup().c_str()); + } +} + +void DefaultMQPullConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info) +{ + std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner(); + + if (subTable.find(topic) != subTable.end()) + { + m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info)); + } +} + +bool DefaultMQPullConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic) +{ + std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner(); + if (subTable.find(topic) != subTable.end()) + { + std::map<std::string, std::set<MessageQueue> >& mqs = + m_pRebalanceImpl->getTopicSubscribeInfoTable(); + return mqs.find(topic) == mqs.end(); + } + + return false; +} + +long long DefaultMQPullConsumerImpl::maxOffset(const MessageQueue& mq) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq); +} + +long long DefaultMQPullConsumerImpl::minOffset(const MessageQueue& mq) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq); +} + +PullResult* DefaultMQPullConsumerImpl::pull(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums) +{ + return pullSyncImpl(mq, subExpression, offset, maxNums, false); +} + +void DefaultMQPullConsumerImpl::pull(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums, + PullCallback* pPullCallback) +{ + pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, false); +} + +PullResult* DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums) +{ + return pullSyncImpl(mq, subExpression, offset, maxNums, true); +} + +void DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums, + PullCallback* pPullCallback) +{ + pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, true); +} + +QueryResult DefaultMQPullConsumerImpl::queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end) +{ + makeSureStateOK(); + + QueryResult result(0, std::list<MessageExt*>()); + return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end); +} + +long long DefaultMQPullConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp); +} + +void DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName) +{ + return sendMessageBack(msg, delayLevel, brokerName, m_pDefaultMQPullConsumer->getConsumerGroup()); +} + + +void DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName, + const std::string& consumerGroup) +{ + try + { + std::string brokerAddr = brokerName.empty() ? + socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName); + + m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, + consumerGroup.empty() ? m_pDefaultMQPullConsumer->getConsumerGroup() : consumerGroup, + delayLevel, + 3000); + } + catch (...) + { + RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPullConsumer->getConsumerGroup().c_str()); + Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPullConsumer->getConsumerGroup()), + msg.getBody(), msg.getBodyLen()); + + std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID); + newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId() + : originMsgId); + + newMsg.setFlag(msg.getFlag()); + newMsg.setProperties(msg.getProperties()); + newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic()); + + int reTimes = msg.getReconsumeTimes() + 1; + newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes)); + newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPullConsumer->getMaxReconsumeTimes())); + newMsg.setDelayTimeLevel(3 + reTimes); + + m_pMQClientFactory->getDefaultMQProducer()->send(newMsg); + } +} + +void DefaultMQPullConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset) +{ + makeSureStateOK(); + m_pOffsetStore->updateOffset(mq, offset, false); +} + +MessageExt* DefaultMQPullConsumerImpl::viewMessage(const std::string& msgId) +{ + makeSureStateOK(); + + return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId); +} + +DefaultMQPullConsumer* DefaultMQPullConsumerImpl::getDefaultMQPullConsumer() +{ + return m_pDefaultMQPullConsumer; +} + +OffsetStore* DefaultMQPullConsumerImpl::getOffsetStore() +{ + return m_pOffsetStore; +} + +void DefaultMQPullConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore) +{ + m_pOffsetStore = pOffsetStore; +} + +void DefaultMQPullConsumerImpl::makeSureStateOK() +{ + if (m_serviceState != RUNNING) + { + THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1); + } +} + +PullResult* DefaultMQPullConsumerImpl::pullSyncImpl(MessageQueue& mq, + const std::string& subExpression, + long long offset, + int maxNums, + bool block) +{ + makeSureStateOK(); + + if (offset < 0) + { + THROW_MQEXCEPTION(MQClientException, "offset < 0", -1); + } + + if (maxNums <= 0) + { + THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1); + } + + subscriptionAutomatically(mq.getTopic()); + + int sysFlag = PullSysFlag::buildSysFlag(false, block, true); + + SubscriptionDataPtr subscriptionData = NULL; + try + { + subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression); + } + catch (...) + { + THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1); + } + + int timeoutMillis = + block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend() + : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis(); + + PullResult* pullResult = m_pPullAPIWrapper->pullKernelImpl(// + mq, // 1 + subscriptionData->getSubString(), // 2 + 0L, // 3 + offset, // 4 + maxNums, // 5 + sysFlag, // 6 + 0, // 7 + m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8 + timeoutMillis, // 9 + SYNC, // 10 + NULL// 11 + ); + + return m_pPullAPIWrapper->processPullResult(mq, *pullResult, *subscriptionData); +} + +void DefaultMQPullConsumerImpl::subscriptionAutomatically(const std::string& topic) +{ + std::map<std::string, SubscriptionData>& sd = m_pRebalanceImpl->getSubscriptionInner(); + std::map<std::string, SubscriptionData>::iterator it = sd.find(topic); + + if (it == sd.end()) + { + try + { + SubscriptionDataPtr subscriptionData = + FilterAPI::buildSubscriptionData(topic, SubscriptionData::SUB_ALL); + sd[topic] = *subscriptionData; + } + catch (...) + { + RMQ_WARN("FilterAPI::buildSubscriptionData exception"); + } + } +} + +void DefaultMQPullConsumerImpl::pullAsyncImpl(// + MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, + PullCallback* pPullCallback,// + bool block) +{ + makeSureStateOK(); + + if (offset < 0) + { + THROW_MQEXCEPTION(MQClientException, "offset < 0", -1); + } + + if (maxNums <= 0) + { + THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1); + } + + if (pPullCallback == NULL) + { + THROW_MQEXCEPTION(MQClientException, "pullCallback is null", -1); + } + + subscriptionAutomatically(mq.getTopic()); + try + { + int sysFlag = PullSysFlag::buildSysFlag(false, block, true); + + SubscriptionDataPtr subscriptionData = NULL; + try + { + subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression); + } + catch (...) + { + THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1); + } + + int timeoutMillis = + block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend() + : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis(); + DefaultMQPullConsumerImplCallback* callback = + new DefaultMQPullConsumerImplCallback(*subscriptionData, + mq, this, pPullCallback); + + m_pPullAPIWrapper->pullKernelImpl( + mq, // 1 + subscriptionData->getSubString(), // 2 + 0L, // 3 + offset, // 4 + maxNums, // 5 + sysFlag, // 6 + 0, // 7 + m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8 + timeoutMillis, // 9 + ASYNC, // 10 + callback// 11 + ); + } + catch (const MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, "pullAsync unknow exception", -1); + } +} + + +void DefaultMQPullConsumerImpl::copySubscription() +{ + try + { + std::set<std::string> registerTopics = m_pDefaultMQPullConsumer->getRegisterTopics(); + std::set<std::string>::iterator it = registerTopics.begin(); + + for (; it != registerTopics.end(); it++) + { + SubscriptionDataPtr subscriptionData = + FilterAPI::buildSubscriptionData(*it, SubscriptionData::SUB_ALL); + m_pRebalanceImpl->getSubscriptionInner()[*it] = *subscriptionData; + } + } + catch (...) + { + THROW_MQEXCEPTION(MQClientException, "subscription exception", -1); + } +} + + +void DefaultMQPullConsumerImpl::checkConfig() +{ + // check consumerGroup + Validators::checkGroup(m_pDefaultMQPullConsumer->getConsumerGroup()); + + // consumerGroup + if (m_pDefaultMQPullConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP) + { + THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal " + + MixAll::DEFAULT_CONSUMER_GROUP // + + ", please specify another one.", -1); + } + + if (m_pDefaultMQPullConsumer->getMessageModel() != BROADCASTING + && m_pDefaultMQPullConsumer->getMessageModel() != CLUSTERING) + { + THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1); + } + + // allocateMessageQueueStrategy + if (m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy() == NULL) + { + THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1); + } +} + +ServiceState DefaultMQPullConsumerImpl::getServiceState() +{ + return m_serviceState; +} + +void DefaultMQPullConsumerImpl::setServiceState(ServiceState serviceState) +{ + m_serviceState = serviceState; +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h new file mode 100755 index 0000000..171565c --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h @@ -0,0 +1,174 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __DEFAULTMQPULLCONSUMERIMPL_H__ +#define __DEFAULTMQPULLCONSUMERIMPL_H__ + +#include <string> +#include <set> +#include <map> +#include <vector> +#include "MQConsumerInner.h" +#include "MessageExt.h" +#include "QueryResult.h" +#include "ServiceState.h" +#include "PullRequest.h" +#include "MessageQueue.h" +#include "PullResult.h" +#include "PullCallback.h" +#include "PullAPIWrapper.h" + +namespace rmq +{ +class DefaultMQPullConsumer; +class PullCallback; +class OffsetStore; +class RebalanceImpl; +class MQClientFactory; +class PullAPIWrapper; + +/** +* PullConsumer imp +*/ +class DefaultMQPullConsumerImpl : public MQConsumerInner +{ +public: + DefaultMQPullConsumerImpl(DefaultMQPullConsumer *pDefaultMQPullConsumer); + ~DefaultMQPullConsumerImpl(); + void createTopic(const std::string &key, const std::string &newTopic, + int queueNum); + long long fetchConsumeOffset(MessageQueue &mq, bool fromStore); + std::set<MessageQueue> *fetchMessageQueuesInBalance(const std::string &topic); + std::vector<MessageQueue> *fetchPublishMessageQueues(const std::string &topic); + std::set<MessageQueue> *fetchSubscribeMessageQueues(const std::string &topic); + long long earliestMsgStoreTime(const MessageQueue &mq); + std::string groupName(); + MessageModel messageModel(); + ConsumeType consumeType(); + ConsumeFromWhere consumeFromWhere(); + std::set<SubscriptionData> subscriptions(); + void doRebalance(); + void persistConsumerOffset(); + void updateTopicSubscribeInfo(const std::string &topic, + const std::set<MessageQueue> &info); + bool isSubscribeTopicNeedUpdate(const std::string &topic); + long long maxOffset(const MessageQueue &mq); + long long minOffset(const MessageQueue &mq); + + PullResult *pull(MessageQueue &mq, + const std::string &subExpression, + long long offset, + int maxNums); + + void pull(MessageQueue &mq, + const std::string &subExpression, + long long offset, + int maxNums, + PullCallback *pPullCallback); + + PullResult *pullBlockIfNotFound(MessageQueue &mq, + const std::string &subExpression, + long long offset, int maxNums); + + void pullBlockIfNotFound(MessageQueue &mq, + const std::string &subExpression, + long long offset, int maxNums, + PullCallback *pPullCallback); + + QueryResult queryMessage(const std::string &topic, + const std::string &key, + int maxNum, + long long begin, + long long end); + + long long searchOffset(const MessageQueue &mq, long long timestamp); + void sendMessageBack(MessageExt &msg, int delayLevel, + const std::string &brokerName); + void sendMessageBack(MessageExt &msg, int delayLevel, + const std::string &brokerName, const std::string &consumerGroup); + void shutdown(); + void updateConsumeOffset(MessageQueue &mq, long long offset); + MessageExt *viewMessage(const std::string &msgId); + DefaultMQPullConsumer *getDefaultMQPullConsumer(); + OffsetStore *getOffsetStore(); + void setOffsetStore(OffsetStore *pOffsetStore); + void start(); + + ServiceState getServiceState(); + void setServiceState(ServiceState serviceState); + +private: + void makeSureStateOK(); + void subscriptionAutomatically(const std::string &topic); + void copySubscription(); + void checkConfig(); + + PullResult *pullSyncImpl(MessageQueue &mq, + const std::string &subExpression, + long long offset, + int maxNums, + bool block) ; + void pullAsyncImpl(MessageQueue &mq, + const std::string &subExpression, + long long offset, + int maxNums, + PullCallback *pPullCallback, + bool block); + +private: + DefaultMQPullConsumer *m_pDefaultMQPullConsumer; + ServiceState m_serviceState; + MQClientFactory *m_pMQClientFactory; + PullAPIWrapper *m_pPullAPIWrapper; + OffsetStore *m_pOffsetStore; + RebalanceImpl *m_pRebalanceImpl; + friend class DefaultMQPullConsumerImplCallback; +}; + +class DefaultMQPullConsumerImplCallback : public PullCallback +{ +public: + DefaultMQPullConsumerImplCallback(SubscriptionData &subscriptionData, + MessageQueue &mq, + DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl, + PullCallback *pCallback) + : m_subscriptionData(subscriptionData), + m_mq(mq), + m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl), + m_pCallback(pCallback) + { + } + + void onSuccess(PullResult &pullResult) + { + m_pCallback->onSuccess( + *m_pDefaultMQPullConsumerImpl->m_pPullAPIWrapper-> + processPullResult(m_mq, pullResult, m_subscriptionData)); + } + + void onException(MQException &e) + { + m_pCallback->onException(e); + } + +private: + SubscriptionData m_subscriptionData; + MessageQueue m_mq; + DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl; + PullCallback *m_pCallback; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp new file mode 100755 index 0000000..45ee907 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp @@ -0,0 +1,399 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "DefaultMQPushConsumer.h" +#include <list> +#include <string> + +#include "DefaultMQPushConsumerImpl.h" +#include "MessageQueue.h" +#include "MessageExt.h" +#include "ClientConfig.h" +#include "ConsumerStatManage.h" +#include "MixAll.h" +#include "AllocateMessageQueueStrategyInner.h" + +namespace rmq +{ + +class AllocateMessageQueueStrategy; + +DefaultMQPushConsumer::DefaultMQPushConsumer() +{ + m_consumerGroup = MixAll::DEFAULT_CONSUMER_GROUP; + m_messageModel = CLUSTERING; + m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET; + m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); + m_pMessageListener = NULL; + m_consumeThreadMin = 5; + m_consumeThreadMax = 25; + m_consumeConcurrentlyMaxSpan = 2000; + m_pullThresholdForQueue = 1000; + m_pullInterval = 0; + m_consumeMessageBatchMaxSize = 1; + m_pullBatchSize = 32; + m_postSubscriptionWhenPull = false; + m_unitMode = false; + m_maxReconsumeTimes = 16; + m_suspendCurrentQueueTimeMillis = 1000; + m_consumeTimeout = 15; + m_pOffsetStore = NULL; + m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this); +} + +DefaultMQPushConsumer::DefaultMQPushConsumer(const std::string& consumerGroup) +{ + m_consumerGroup = consumerGroup; + m_messageModel = CLUSTERING; + m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET; + m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely(); + m_pMessageListener = NULL; + m_consumeThreadMin = 5; + m_consumeThreadMax = 25; + m_consumeConcurrentlyMaxSpan = 2000; + m_pullThresholdForQueue = 1000; + m_pullInterval = 0; + m_consumeMessageBatchMaxSize = 1; + m_pullBatchSize = 32; + m_postSubscriptionWhenPull = false; + m_unitMode = false; + m_maxReconsumeTimes = 16; + m_suspendCurrentQueueTimeMillis = 1000; + m_consumeTimeout = 15; + m_pOffsetStore = NULL; + m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this); +} + +DefaultMQPushConsumer::~DefaultMQPushConsumer() +{ + delete m_pAllocateMessageQueueStrategy; + delete m_pDefaultMQPushConsumerImpl; +} + +//MQAdmin +void DefaultMQPushConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum) +{ + m_pDefaultMQPushConsumerImpl->createTopic(key, newTopic, queueNum); +} + +long long DefaultMQPushConsumer::searchOffset(const MessageQueue& mq, long long timestamp) +{ + return m_pDefaultMQPushConsumerImpl->searchOffset(mq, timestamp); +} + +long long DefaultMQPushConsumer::maxOffset(const MessageQueue& mq) +{ + return m_pDefaultMQPushConsumerImpl->maxOffset(mq); +} + +long long DefaultMQPushConsumer::minOffset(const MessageQueue& mq) +{ + return m_pDefaultMQPushConsumerImpl->minOffset(mq); +} + +long long DefaultMQPushConsumer::earliestMsgStoreTime(const MessageQueue& mq) +{ + return m_pDefaultMQPushConsumerImpl->earliestMsgStoreTime(mq); +} + +MessageExt* DefaultMQPushConsumer::viewMessage(const std::string& msgId) +{ + return m_pDefaultMQPushConsumerImpl->viewMessage(msgId); +} + +QueryResult DefaultMQPushConsumer::queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end) +{ + return m_pDefaultMQPushConsumerImpl->queryMessage(topic, key, maxNum, begin, end); +} +// MQadmin end + +AllocateMessageQueueStrategy* DefaultMQPushConsumer::getAllocateMessageQueueStrategy() +{ + return m_pAllocateMessageQueueStrategy; +} + +void DefaultMQPushConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy) +{ + m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy; +} + +int DefaultMQPushConsumer::getConsumeConcurrentlyMaxSpan() +{ + return m_consumeConcurrentlyMaxSpan; +} + +void DefaultMQPushConsumer::setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) +{ + m_consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan; +} + +ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere() +{ + return m_consumeFromWhere; +} + +void DefaultMQPushConsumer::setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) +{ + m_consumeFromWhere = consumeFromWhere; +} + +int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize() +{ + return m_consumeMessageBatchMaxSize; +} + +void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) +{ + m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; +} + +std::string DefaultMQPushConsumer::getConsumerGroup() +{ + return m_consumerGroup; +} + +void DefaultMQPushConsumer::setConsumerGroup(const std::string& consumerGroup) +{ + m_consumerGroup = consumerGroup; +} + +int DefaultMQPushConsumer::getConsumeThreadMax() +{ + return m_consumeThreadMax; +} + +void DefaultMQPushConsumer::setConsumeThreadMax(int consumeThreadMax) +{ + m_consumeThreadMax = consumeThreadMax; +} + +int DefaultMQPushConsumer::getConsumeThreadMin() +{ + return m_consumeThreadMin; +} + +void DefaultMQPushConsumer::setConsumeThreadMin(int consumeThreadMin) +{ + m_consumeThreadMin = consumeThreadMin; +} + +DefaultMQPushConsumerImpl* DefaultMQPushConsumer::getDefaultMQPushConsumerImpl() +{ + return m_pDefaultMQPushConsumerImpl; +} + +MessageListener* DefaultMQPushConsumer::getMessageListener() +{ + return m_pMessageListener; +} + +void DefaultMQPushConsumer::setMessageListener(MessageListener* pMessageListener) +{ + m_pMessageListener = pMessageListener; +} + +MessageModel DefaultMQPushConsumer::getMessageModel() +{ + return m_messageModel; +} + +void DefaultMQPushConsumer::setMessageModel(MessageModel messageModel) +{ + m_messageModel = messageModel; +} + +int DefaultMQPushConsumer::getPullBatchSize() +{ + return m_pullBatchSize; +} + +void DefaultMQPushConsumer::setPullBatchSize(int pullBatchSize) +{ + m_pullBatchSize = pullBatchSize; +} + +long DefaultMQPushConsumer::getPullInterval() +{ + return m_pullInterval; +} + +void DefaultMQPushConsumer::setPullInterval(long pullInterval) +{ + m_pullInterval = pullInterval; +} + +int DefaultMQPushConsumer::getPullThresholdForQueue() +{ + return m_pullThresholdForQueue; +} + +void DefaultMQPushConsumer::setPullThresholdForQueue(int pullThresholdForQueue) +{ + m_pullThresholdForQueue = pullThresholdForQueue; +} + +std::map<std::string, std::string>& DefaultMQPushConsumer::getSubscription() +{ + return m_subscription; +} + +void DefaultMQPushConsumer::setSubscription(const std::map<std::string, std::string>& subscription) +{ + m_subscription = subscription; +} + +//MQConsumer +void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel) +{ + m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, ""); +} + +void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName) +{ + m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, brokerName); +} + + +std::set<MessageQueue>* DefaultMQPushConsumer::fetchSubscribeMessageQueues(const std::string& topic) +{ + return m_pDefaultMQPushConsumerImpl->fetchSubscribeMessageQueues(topic); +} + +void DefaultMQPushConsumer::start() +{ + m_pDefaultMQPushConsumerImpl->start(); +} + +void DefaultMQPushConsumer::shutdown() +{ + m_pDefaultMQPushConsumerImpl->shutdown(); +} +//MQConsumer end + +//MQPushConsumer +void DefaultMQPushConsumer::registerMessageListener(MessageListener* pMessageListener) +{ + m_pMessageListener = pMessageListener; + m_pDefaultMQPushConsumerImpl->registerMessageListener(pMessageListener); +} + +void DefaultMQPushConsumer::subscribe(const std::string& topic, const std::string& subExpression) +{ + m_pDefaultMQPushConsumerImpl->subscribe(topic, subExpression); +} + +void DefaultMQPushConsumer::unsubscribe(const std::string& topic) +{ + m_pDefaultMQPushConsumerImpl->unsubscribe(topic); +} + +void DefaultMQPushConsumer::updateCorePoolSize(int corePoolSize) +{ + m_pDefaultMQPushConsumerImpl->updateCorePoolSize(corePoolSize); +} + +void DefaultMQPushConsumer::suspend() +{ + m_pDefaultMQPushConsumerImpl->suspend(); +} + +void DefaultMQPushConsumer::resume() +{ + m_pDefaultMQPushConsumerImpl->resume(); +} +//MQPushConsumer end + +OffsetStore* DefaultMQPushConsumer::getOffsetStore() +{ + return m_pOffsetStore; +} + +void DefaultMQPushConsumer::setOffsetStore(OffsetStore* pOffsetStore) +{ + m_pOffsetStore = pOffsetStore; +} + +std::string DefaultMQPushConsumer::getConsumeTimestamp() { + return m_consumeTimestamp; +} + +void DefaultMQPushConsumer::setConsumeTimestamp(std::string consumeTimestamp) { + m_consumeTimestamp = consumeTimestamp; +} + +bool DefaultMQPushConsumer::isPostSubscriptionWhenPull() +{ + return m_postSubscriptionWhenPull; +} + + +void DefaultMQPushConsumer::setPostSubscriptionWhenPull(bool postSubscriptionWhenPull) +{ + m_postSubscriptionWhenPull = postSubscriptionWhenPull; +} + + +bool DefaultMQPushConsumer::isUnitMode() +{ + return m_unitMode; +} + + +void DefaultMQPushConsumer::setUnitMode(bool isUnitMode) +{ + m_unitMode = isUnitMode; +} + +int DefaultMQPushConsumer::getMaxReconsumeTimes() +{ + return m_maxReconsumeTimes; +} + + +void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes) +{ + m_maxReconsumeTimes = maxReconsumeTimes; +} + + +int DefaultMQPushConsumer::getSuspendCurrentQueueTimeMillis() +{ + return m_suspendCurrentQueueTimeMillis; +} + + +void DefaultMQPushConsumer::setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis) +{ + m_suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis; +} + + +int DefaultMQPushConsumer::getConsumeTimeout() +{ + return m_consumeTimeout; +} + +void DefaultMQPushConsumer::setConsumeTimeout(int consumeTimeout) +{ + m_consumeTimeout = consumeTimeout; +} + + +}
