http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullRequest.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.cpp b/rocketmq-client4cpp/src/consumer/PullRequest.cpp new file mode 100755 index 0000000..b8650c6 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullRequest.cpp @@ -0,0 +1,108 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "PullRequest.h" +#include "UtilAll.h" + +namespace rmq +{ + +PullRequest::~PullRequest() +{ + +} + +std::string PullRequest::getConsumerGroup() +{ + return m_consumerGroup; +} + +void PullRequest::setConsumerGroup(const std::string& consumerGroup) +{ + m_consumerGroup = consumerGroup; +} + +MessageQueue& PullRequest::getMessageQueue() +{ + return m_messageQueue; +} + +void PullRequest::setMessageQueue(const MessageQueue& messageQueue) +{ + m_messageQueue = messageQueue; +} + +long long PullRequest::getNextOffset() +{ + return m_nextOffset; +} + +void PullRequest::setNextOffset(long long nextOffset) +{ + m_nextOffset = nextOffset; +} + +int PullRequest::hashCode() +{ + /* + final int prime = 31; + int result = 1; + result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode()); + result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode()); + return result; + */ + std::stringstream ss; + ss << m_consumerGroup + << m_messageQueue.hashCode(); + return UtilAll::hashCode(ss.str()); +} + +std::string PullRequest::toString() const +{ + std::stringstream ss; + ss << "{consumerGroup=" << m_consumerGroup + << ",messageQueue=" << m_messageQueue.toString() + << ",nextOffset=" << m_nextOffset << "}"; + return ss.str(); +} + + +bool PullRequest::operator==(const PullRequest& other) +{ + if (m_consumerGroup != other.m_consumerGroup) + { + return false; + } + + if (!(m_messageQueue == other.m_messageQueue)) + { + return false; + } + + return true; +} + +ProcessQueue* PullRequest::getProcessQueue() +{ + return m_pProcessQueue; +} + +void PullRequest::setProcessQueue(ProcessQueue* pProcessQueue) +{ + m_pProcessQueue = pProcessQueue; +} + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullRequest.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.h b/rocketmq-client4cpp/src/consumer/PullRequest.h new file mode 100755 index 0000000..3fb8367 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullRequest.h @@ -0,0 +1,59 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PULLREQUEST_H__ +#define __PULLREQUEST_H__ + +#include <string> +#include <sstream> + +#include "MessageQueue.h" +#include "ProcessQueue.h" + +namespace rmq +{ + class PullRequest + { + public: + virtual ~PullRequest(); + + std::string getConsumerGroup(); + void setConsumerGroup(const std::string& consumerGroup); + + MessageQueue& getMessageQueue(); + void setMessageQueue(const MessageQueue& messageQueue); + + long long getNextOffset(); + void setNextOffset(long long nextOffset); + + int hashCode(); + std::string toString() const; + + bool operator==(const PullRequest& other); + + ProcessQueue* getProcessQueue(); + void setProcessQueue(ProcessQueue* pProcessQueue); + + private: + std::string m_consumerGroup; + MessageQueue m_messageQueue; + + ProcessQueue* m_pProcessQueue; + long long m_nextOffset; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullResultExt.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/PullResultExt.h b/rocketmq-client4cpp/src/consumer/PullResultExt.h new file mode 100755 index 0000000..24235b2 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/PullResultExt.h @@ -0,0 +1,53 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PULLRESULTEXT_H__ +#define __PULLRESULTEXT_H__ + +#include "PullResult.h" + +namespace rmq +{ + + struct PullResultExt : public PullResult + { + PullResultExt(PullStatus pullStatus, + long long nextBeginOffset, + long long minOffset, + long long maxOffset, + std::list<MessageExt*>& msgFoundList, + long suggestWhichBrokerId, + const char* messageBinary, + int messageBinaryLen) + : PullResult(pullStatus, + nextBeginOffset, + minOffset, + maxOffset, + msgFoundList), + suggestWhichBrokerId(suggestWhichBrokerId), + messageBinary(messageBinary), + messageBinaryLen(messageBinaryLen) + { + + } + + long suggestWhichBrokerId; + const char* messageBinary; + int messageBinaryLen; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp new file mode 100755 index 0000000..efdc1cc --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp @@ -0,0 +1,613 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "RebalanceImpl.h" +#include "AllocateMessageQueueStrategy.h" +#include "MQClientFactory.h" +#include "MixAll.h" +#include "LockBatchBody.h" +#include "MQClientAPIImpl.h" +#include "KPRUtil.h" +#include "ScopedLock.h" + +namespace rmq +{ + +RebalanceImpl::RebalanceImpl(const std::string& consumerGroup, + MessageModel messageModel, + AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy, + MQClientFactory* pMQClientFactory) + : m_consumerGroup(consumerGroup), + m_messageModel(messageModel), + m_pAllocateMessageQueueStrategy(pAllocateMessageQueueStrategy), + m_pMQClientFactory(pMQClientFactory) +{ + +} + +RebalanceImpl::~RebalanceImpl() +{ +} + +void RebalanceImpl::unlock(MessageQueue& mq, bool oneway) +{ + FindBrokerResult findBrokerResult = + m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true); + if (!findBrokerResult.brokerAddr.empty()) + { + UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody(); + requestBody->setConsumerGroup(m_consumerGroup); + requestBody->setClientId(m_pMQClientFactory->getClientId()); + requestBody->getMqSet().insert(mq); + + try + { + m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr, + requestBody, 1000, oneway); + } + catch (...) + { + RMQ_ERROR("unlockBatchMQ exception, MQ: {%s}" , mq.toString().c_str()); + } + } +} + +void RebalanceImpl::unlockAll(bool oneway) +{ + std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName(); + std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin(); + + for (; it != brokerMqs.end(); it++) + { + std::string brokerName = it->first; + std::set<MessageQueue> mqs = it->second; + + if (mqs.empty()) + { + continue; + } + + FindBrokerResult findBrokerResult = + m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true); + + if (!findBrokerResult.brokerAddr.empty()) + { + UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody(); + requestBody->setConsumerGroup(m_consumerGroup); + requestBody->setClientId(m_pMQClientFactory->getClientId()); + requestBody->setMqSet(mqs); + + try + { + m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr, + requestBody, 1000, oneway); + + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::set<MessageQueue>::iterator itm = mqs.begin(); + for (; itm != mqs.end(); itm++) + { + std::map<MessageQueue, ProcessQueue*>::iterator itp = m_processQueueTable.find(*itm); + if (itp != m_processQueueTable.end()) + { + itp->second->setLocked(false); + RMQ_INFO("the message queue unlock OK, Group: {%s}, MQ: {%s}", + m_consumerGroup.c_str(), (*itm).toString().c_str()); + } + } + } + catch (...) + { + RMQ_ERROR("unlockBatchMQ exception, mqs.size: {%u} ", (unsigned)mqs.size()); + } + } + } +} + +bool RebalanceImpl::lock(MessageQueue& mq) +{ + FindBrokerResult findBrokerResult = + m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true); + if (!findBrokerResult.brokerAddr.empty()) + { + LockBatchRequestBody* requestBody = new LockBatchRequestBody(); + requestBody->setConsumerGroup(m_consumerGroup); + requestBody->setClientId(m_pMQClientFactory->getClientId()); + requestBody->getMqSet().insert(mq); + + try + { + std::set<MessageQueue> lockedMq = + m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ( + findBrokerResult.brokerAddr, requestBody, 1000); + + std::set<MessageQueue>::iterator it = lockedMq.begin(); + for (; it != lockedMq.end(); it++) + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + MessageQueue mmqq = *it; + std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mmqq); + if (itt != m_processQueueTable.end()) + { + itt->second->setLocked(true); + itt->second->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis()); + } + } + + it = lockedMq.find(mq); + bool lockOK = (it != lockedMq.end()); + + RMQ_INFO("the message queue lock {%s}, {%s}, {%s}",// + (lockOK ? "OK" : "Failed"), // + m_consumerGroup.c_str(), // + mq.toString().c_str()); + return lockOK; + } + catch (...) + { + RMQ_ERROR("lockBatchMQ exception, MQ: {%s}", mq.toString().c_str()); + } + } + + return false; +} + +void RebalanceImpl::lockAll() +{ + std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName(); + + std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin(); + for (; it != brokerMqs.end(); it++) + { + std::string brokerName = it->first; + std::set<MessageQueue> mqs = it->second; + + if (mqs.empty()) + { + continue; + } + + FindBrokerResult findBrokerResult = + m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true); + if (!findBrokerResult.brokerAddr.empty()) + { + LockBatchRequestBody* requestBody = new LockBatchRequestBody(); + requestBody->setConsumerGroup(m_consumerGroup); + requestBody->setClientId(m_pMQClientFactory->getClientId()); + requestBody->setMqSet(mqs); + + try + { + std::set<MessageQueue> lockOKMQSet = + m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ( + findBrokerResult.brokerAddr, requestBody, 1000); + + std::set<MessageQueue>::iterator its = lockOKMQSet.begin(); + for (; its != lockOKMQSet.end(); its++) + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + MessageQueue mq = *its; + std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq); + if (itt != m_processQueueTable.end()) + { + ProcessQueue* processQueue = itt->second; + if (!processQueue->isLocked()) + { + RMQ_INFO("the message queue locked OK, Group: {%s}, MQ: %s", + m_consumerGroup.c_str(), + mq.toString().c_str()); + } + + processQueue->setLocked(true); + processQueue->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis()); + } + } + + its = mqs.begin(); + for (; its != mqs.end(); its++) + { + MessageQueue mq = *its; + std::set<MessageQueue>::iterator itf = lockOKMQSet.find(mq); + if (itf == lockOKMQSet.end()) + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq); + if (itt != m_processQueueTable.end()) + { + itt->second->setLocked(false); + RMQ_WARN("the message queue locked Failed, Group: {%s}, MQ: %s", + m_consumerGroup.c_str(), + mq.toString().c_str()); + } + } + } + } + catch (std::exception& e) + { + RMQ_ERROR("lockBatchMQ exception: %s", e.what()); + } + } + } +} + +void RebalanceImpl::doRebalance() +{ + std::map<std::string, SubscriptionData> subTable = getSubscriptionInner(); + std::map<std::string, SubscriptionData>::iterator it = subTable.begin(); + for (; it != subTable.end(); it++) + { + std::string topic = it->first; + try + { + rebalanceByTopic(topic); + } + catch (std::exception& e) + { + if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0) + { + RMQ_WARN("rebalanceByTopic Exception: %s", e.what()); + } + } + } + + truncateMessageQueueNotMyTopic(); +} + +std::map<std::string, SubscriptionData>& RebalanceImpl::getSubscriptionInner() +{ + return m_subscriptionInner; +} + +std::map<MessageQueue, ProcessQueue*>& RebalanceImpl::getProcessQueueTable() +{ + return m_processQueueTable; +} + + +kpr::RWMutex& RebalanceImpl::getProcessQueueTableLock() +{ + return m_processQueueTableLock; +} + + +std::map<std::string, std::set<MessageQueue> >& RebalanceImpl::getTopicSubscribeInfoTable() +{ + return m_topicSubscribeInfoTable; +} + +std::string& RebalanceImpl::getConsumerGroup() +{ + return m_consumerGroup; +} + +void RebalanceImpl::setConsumerGroup(const std::string& consumerGroup) +{ + m_consumerGroup = consumerGroup; +} + +MessageModel RebalanceImpl::getMessageModel() +{ + return m_messageModel; +} + +void RebalanceImpl::setMessageModel(MessageModel messageModel) +{ + m_messageModel = messageModel; +} + +AllocateMessageQueueStrategy* RebalanceImpl::getAllocateMessageQueueStrategy() +{ + return m_pAllocateMessageQueueStrategy; +} + +void RebalanceImpl::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy) +{ + m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy; +} + +MQClientFactory* RebalanceImpl::getmQClientFactory() +{ + return m_pMQClientFactory; +} + +void RebalanceImpl::setmQClientFactory(MQClientFactory* pMQClientFactory) +{ + m_pMQClientFactory = pMQClientFactory; +} + +std::map<std::string, std::set<MessageQueue> > RebalanceImpl::buildProcessQueueTableByBrokerName() +{ + std::map<std::string, std::set<MessageQueue> > result ; + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin(); + for (; it != m_processQueueTable.end();) + { + MessageQueue mq = it->first; + std::map<std::string, std::set<MessageQueue> >::iterator itm = result.find(mq.getBrokerName()); + if (itm == result.end()) + { + std::set<MessageQueue> mqs ; + mqs.insert(mq); + result[mq.getBrokerName()] = mqs; + } + else + { + itm->second.insert(mq); + } + } + + return result; +} + +void RebalanceImpl::rebalanceByTopic(const std::string& topic) +{ + RMQ_DEBUG("rebalanceByTopic begin, topic={%s}", topic.c_str()); + switch (m_messageModel) + { + case BROADCASTING: + { + //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock); + std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic); + if (it != m_topicSubscribeInfoTable.end()) + { + std::set<MessageQueue> mqSet = it->second; + bool changed = updateProcessQueueTableInRebalance(topic, mqSet); + if (changed) + { + messageQueueChanged(topic, mqSet, mqSet); + RMQ_INFO("messageQueueChanged {%s} {%s} {%s} {%s}", + m_consumerGroup.c_str(), + topic.c_str(), + UtilAll::toString(mqSet).c_str(), + UtilAll::toString(mqSet).c_str()); + } + } + else + { + RMQ_WARN("doRebalance, {%s}, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str()); + } + break; + } + case CLUSTERING: + { + //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock); + std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic); + if (it == m_topicSubscribeInfoTable.end()) + { + if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0) + { + RMQ_WARN("doRebalance, %s, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str()); + } + } + + std::list<std::string> cidAll = m_pMQClientFactory->findConsumerIdList(topic, m_consumerGroup); + if (cidAll.empty()) + { + RMQ_WARN("doRebalance, %s:%s, get consumer id list failed.", m_consumerGroup.c_str(), topic.c_str()); + } + + if (it != m_topicSubscribeInfoTable.end() && !cidAll.empty()) + { + std::vector<MessageQueue> mqAll; + std::set<MessageQueue> mqSet = it->second; + std::set<MessageQueue>::iterator its = mqSet.begin(); + + for (; its != mqSet.end(); its++) + { + mqAll.push_back(*its); + } + + cidAll.sort(); + + AllocateMessageQueueStrategy* strategy = m_pAllocateMessageQueueStrategy; + + std::vector<MessageQueue>* allocateResult = NULL; + try + { + allocateResult = strategy->allocate(m_consumerGroup, + m_pMQClientFactory->getClientId(), mqAll, cidAll); + } + catch (std::exception& e) + { + RMQ_ERROR("AllocateMessageQueueStrategy.allocate Exception, allocateMessageQueueStrategyName={%s}, mqAll={%s}, cidAll={%s}, %s", + strategy->getName().c_str(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str(), e.what()); + return; + } + + std::set<MessageQueue> allocateResultSet; + if (allocateResult != NULL) + { + for (size_t i = 0; i < allocateResult->size(); i++) + { + allocateResultSet.insert(allocateResult->at(i)); + } + + delete allocateResult; + } + + bool changed = updateProcessQueueTableInRebalance(topic, allocateResultSet); + if (changed) + { + RMQ_INFO("rebalanced result changed. allocateMessageQueueStrategyName={%s}, group={%s}, topic={%s}, ConsumerId={%s}, " + "rebalanceSize={%u}, rebalanceMqSet={%s}, mqAllSize={%u}, cidAllSize={%u}, mqAll={%s}, cidAll={%s}", + strategy->getName().c_str(), m_consumerGroup.c_str(), topic.c_str(), m_pMQClientFactory->getClientId().c_str(), + (unsigned)allocateResultSet.size(), UtilAll::toString(allocateResultSet).c_str(), + (unsigned)mqAll.size(), (unsigned)cidAll.size(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str() + ); + + messageQueueChanged(topic, mqSet, allocateResultSet); + } + } + } + break; + default: + break; + } + RMQ_DEBUG("rebalanceByTopic end"); +} + + +void RebalanceImpl::removeProcessQueue(const MessageQueue& mq) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.find(mq); + if (it != m_processQueueTable.end()) + { + MessageQueue mq = it->first; + ProcessQueue* pq = it->second; + bool isDroped = pq->isDropped(); + + this->removeUnnecessaryMessageQueue(mq, *pq); + RMQ_INFO("Fix Offset, {%s}, remove unnecessary mq, {%s} Droped: {%d}", + m_consumerGroup.c_str(), mq.toString().c_str(), isDroped); + } +} + + +bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet) +{ + RMQ_DEBUG("updateProcessQueueTableInRebalance begin, topic={%s}", topic.c_str()); + bool changed = false; + + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin(); + for (; it != m_processQueueTable.end();) + { + std::map<MessageQueue, ProcessQueue*>::iterator itCur = it++; + MessageQueue mq = itCur->first; + ProcessQueue* pq = itCur->second; + if (mq.getTopic() == topic) + { + std::set<MessageQueue>::iterator itMq = mqSet.find(mq); + if (itMq == mqSet.end()) + { + pq->setDropped(true); + if (this->removeUnnecessaryMessageQueue(mq, *pq)) + { + changed = true; + m_processQueueTable.erase(itCur); + + RMQ_WARN("doRebalance, {%s}, remove unnecessary mq, {%s}", + m_consumerGroup.c_str(), mq.toString().c_str()); + } + } + else if (pq->isPullExpired()) + { + switch(this->consumeType()) + { + case CONSUME_ACTIVELY: + break; + case CONSUME_PASSIVELY: + pq->setDropped(true); + if (this->removeUnnecessaryMessageQueue(mq, *pq)) + { + changed = true; + m_processQueueTable.erase(itCur); + + RMQ_ERROR("[BUG]doRebalance, {%s}, remove unnecessary mq, {%s}, because pull is pause, so try to fixed it", + m_consumerGroup.c_str(), mq.toString().c_str()); + } + break; + default: + break; + } + } + } + } + } + + std::list<PullRequest*> pullRequestList; + std::set<MessageQueue>::iterator its = mqSet.begin(); + for (; its != mqSet.end(); its++) + { + MessageQueue mq = *its; + bool find = false; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::map<MessageQueue, ProcessQueue*>::iterator itm = m_processQueueTable.find(mq); + if (itm != m_processQueueTable.end()) + { + find = true; + } + } + + if (!find) + { + //todo: memleak + PullRequest* pullRequest = new PullRequest(); + pullRequest->setConsumerGroup(m_consumerGroup); + pullRequest->setMessageQueue(mq); + pullRequest->setProcessQueue(new ProcessQueue());//todo: memleak + + long long nextOffset = computePullFromWhere(mq); + if (nextOffset >= 0) + { + pullRequest->setNextOffset(nextOffset); + pullRequestList.push_back(pullRequest); + changed = true; + + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock); + m_processQueueTable[mq] = pullRequest->getProcessQueue(); + RMQ_INFO("doRebalance, {%s}, add a new mq, {%s}, pullRequst: %s", + m_consumerGroup.c_str(), mq.toString().c_str(), pullRequest->toString().c_str()); + } + } + else + { + RMQ_WARN("doRebalance, {%s}, add new mq failed, {%s}", + m_consumerGroup.c_str(), mq.toString().c_str()); + } + } + } + + //todo memleak + dispatchPullRequest(pullRequestList); + RMQ_DEBUG("updateProcessQueueTableInRebalance end"); + + return changed; +} + +void RebalanceImpl::truncateMessageQueueNotMyTopic() +{ + std::map<std::string, SubscriptionData> subTable = getSubscriptionInner(); + + kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock); + std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin(); + for (; it != m_processQueueTable.end();) + { + MessageQueue mq = it->first; + std::map<std::string, SubscriptionData>::iterator itt = subTable.find(mq.getTopic()); + + if (itt == subTable.end()) + { + ProcessQueue* pq = it->second; + if (pq != NULL) + { + pq->setDropped(true); + RMQ_WARN("doRebalance, {%s}, truncateMessageQueueNotMyTopic remove unnecessary mq, {%s}", + m_consumerGroup.c_str(), mq.toString().c_str()); + } + m_processQueueTable.erase(it++); + } + else + { + it++; + } + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h new file mode 100755 index 0000000..577a031 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h @@ -0,0 +1,102 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __REBALANCEIMPL_H__ +#define __REBALANCEIMPL_H__ + +#include <map> +#include <string> +#include <set> +#include <list> + +#include "ConsumeType.h" +#include "MessageQueue.h" +#include "ProcessQueue.h" +#include "PullRequest.h" +#include "SubscriptionData.h" + +namespace rmq +{ + class AllocateMessageQueueStrategy; + class MQClientFactory; + + class RebalanceImpl + { + public: + RebalanceImpl(const std::string& consumerGroup, + MessageModel messageModel, + AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy, + MQClientFactory* pMQClientFactory); + virtual ~RebalanceImpl(); + + virtual void messageQueueChanged(const std::string& topic, + std::set<MessageQueue>& mqAll, + std::set<MessageQueue>& mqDivided) = 0; + virtual bool removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) = 0; + virtual void dispatchPullRequest(std::list<PullRequest*>& pullRequestList) = 0; + virtual long long computePullFromWhere(MessageQueue& mq) = 0; + virtual ConsumeType consumeType() = 0; + + bool lock(MessageQueue& mq); + void lockAll(); + + void unlock(MessageQueue& mq, bool oneway); + void unlockAll(bool oneway); + + void doRebalance(); + + std::map<MessageQueue, ProcessQueue*>& getProcessQueueTable(); + kpr::RWMutex& getProcessQueueTableLock(); + std::map<std::string, SubscriptionData>& getSubscriptionInner(); + std::map<std::string, std::set<MessageQueue> >& getTopicSubscribeInfoTable(); + + std::string& getConsumerGroup(); + void setConsumerGroup(const std::string& consumerGroup); + + MessageModel getMessageModel(); + void setMessageModel(MessageModel messageModel); + + AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy(); + void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy); + + MQClientFactory* getmQClientFactory(); + void setmQClientFactory(MQClientFactory* pMQClientFactory); + + void removeProcessQueue(const MessageQueue& mq); + + private: + std::map<std::string, std::set<MessageQueue> > buildProcessQueueTableByBrokerName(); + void rebalanceByTopic(const std::string& topic); + bool updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet); + void truncateMessageQueueNotMyTopic(); + + protected: + std::map<MessageQueue, ProcessQueue*> m_processQueueTable; + kpr::RWMutex m_processQueueTableLock; + + std::map<std::string, std::set<MessageQueue> > m_topicSubscribeInfoTable; + kpr::Mutex m_topicSubscribeInfoTableLock; + + std::map<std::string, SubscriptionData> m_subscriptionInner; + kpr::Mutex m_subscriptionInnerLock; + + std::string m_consumerGroup; + MessageModel m_messageModel; + AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy; + MQClientFactory* m_pMQClientFactory; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp new file mode 100755 index 0000000..1aa287b --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp @@ -0,0 +1,79 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "RebalancePullImpl.h" +#include "DefaultMQPullConsumerImpl.h" +#include "AllocateMessageQueueStrategy.h" +#include "MQClientFactory.h" +#include "MessageQueueListener.h" +#include "OffsetStore.h" +#include "DefaultMQPullConsumer.h" + +namespace rmq +{ + +RebalancePullImpl::RebalancePullImpl(DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl) + : RebalanceImpl("", BROADCASTING, NULL, NULL), + m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl) +{ +} + +RebalancePullImpl::RebalancePullImpl(const std::string& consumerGroup, + MessageModel messageModel, + AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy, + MQClientFactory* pMQClientFactory, + DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl) + : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory), + m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl) +{ +} + +long long RebalancePullImpl::computePullFromWhere(MessageQueue& mq) +{ + return 0; +} + +void RebalancePullImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList) +{ +} + +void RebalancePullImpl::messageQueueChanged(const std::string& topic, + std::set<MessageQueue>& mqAll, + std::set<MessageQueue>& mqDivided) +{ + MessageQueueListener* messageQueueListener = + m_pDefaultMQPullConsumerImpl->getDefaultMQPullConsumer()->getMessageQueueListener(); + if (messageQueueListener != NULL) + { + try + { + messageQueueListener->messageQueueChanged(topic, mqAll, mqDivided); + } + catch (...) + { + RMQ_ERROR("messageQueueChanged exception, %s", topic.c_str()); + } + } +} + +bool RebalancePullImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) +{ + m_pDefaultMQPullConsumerImpl->getOffsetStore()->persist(mq); + m_pDefaultMQPullConsumerImpl->getOffsetStore()->removeOffset(mq); + return true; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h new file mode 100755 index 0000000..46dbcd1 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h @@ -0,0 +1,56 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __REBALANCEPULLIMPL_H__ +#define __REBALANCEPULLIMPL_H__ + +#include "RebalanceImpl.h" + +namespace rmq +{ +class DefaultMQPullConsumerImpl; + +class RebalancePullImpl : public RebalanceImpl +{ +public: + RebalancePullImpl(DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl); + + RebalancePullImpl(const std::string &consumerGroup, + MessageModel messageModel, + AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy, + MQClientFactory *pMQClientFactory, + DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl); + + long long computePullFromWhere(MessageQueue &mq); + + void dispatchPullRequest(std::list<PullRequest *> &pullRequestList); + + void messageQueueChanged(const std::string &topic, + std::set<MessageQueue> &mqAll, + std::set<MessageQueue> &mqDivided); + + bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq); + + ConsumeType consumeType() + { + return CONSUME_ACTIVELY; + }; + +private: + DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp new file mode 100755 index 0000000..fde770d --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp @@ -0,0 +1,217 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "RebalancePushImpl.h" + +#include <string.h> +#include <limits.h> + +#include "DefaultMQPushConsumerImpl.h" +#include "AllocateMessageQueueStrategy.h" +#include "MQClientFactory.h" +#include "MessageQueueListener.h" +#include "OffsetStore.h" +#include "DefaultMQPushConsumer.h" +#include "MQAdminImpl.h" + + +namespace rmq +{ + +RebalancePushImpl::RebalancePushImpl(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl) + : RebalanceImpl("", BROADCASTING, NULL, NULL), + m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl) +{ +} + +RebalancePushImpl::RebalancePushImpl(const std::string& consumerGroup, + MessageModel messageModel, + AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy, + MQClientFactory* pMQClientFactory, + DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl) + : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory), + m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl) +{ +} + +void RebalancePushImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList) +{ + std::list<PullRequest*>::iterator it = pullRequestList.begin(); + for (; it != pullRequestList.end(); it++) + { + m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(*it); + RMQ_INFO("doRebalance, {%s}, add a new pull request {%s}", + m_consumerGroup.c_str(), (*it)->toString().c_str()); + } +} + +long long RebalancePushImpl::computePullFromWhere(MessageQueue& mq) +{ + long long result = -1; + ConsumeFromWhere consumeFromWhere = + m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeFromWhere(); + OffsetStore* offsetStore = m_pDefaultMQPushConsumerImpl->getOffsetStore(); + + switch (consumeFromWhere) + { + case CONSUME_FROM_FIRST_OFFSET: + { + long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE); + if (lastOffset >= 0) + { + result = lastOffset; + } + else if (-1 == lastOffset) + { + result = 0L; + } + else + { + result = -1; + } + break; + } + case CONSUME_FROM_LAST_OFFSET: + { + long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE); + if (lastOffset >= 0) + { + result = lastOffset; + } + else if (-1 == lastOffset) + { + if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0) + { + result = 0L; + } + else + { + //result = LLONG_MAX; + try + { + result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq); + } + catch(...) + { + result = -1; + } + } + } + else + { + result = -1; + } + break; + } + + case CONSUME_FROM_MAX_OFFSET: + result = LLONG_MAX; + break; + case CONSUME_FROM_MIN_OFFSET: + result = 0L; + break; + case CONSUME_FROM_TIMESTAMP: + { + long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE); + if (lastOffset >= 0) + { + result = lastOffset; + } + else if (-1 == lastOffset) + { + if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0) + { + //result = LLONG_MAX; + try + { + result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq); + } + catch(...) + { + result = -1; + } + } + else + { + try + { + long timestamp = UtilAll::str2tm( + m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeTimestamp(), + rmq::yyyyMMddHHmmss); + result = m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp); + } + catch(...) + { + result = -1; + } + } + } + else + { + result = -1; + } + break; + } + break; + default: + break; + } + + return result; +} + +void RebalancePushImpl::messageQueueChanged(const std::string& topic, + std::set<MessageQueue>& mqAll, + std::set<MessageQueue>& mqDivided) +{ +} + + +bool RebalancePushImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) +{ + m_pDefaultMQPushConsumerImpl->getOffsetStore()->persist(mq); + m_pDefaultMQPushConsumerImpl->getOffsetStore()->removeOffset(mq); + if (m_pDefaultMQPushConsumerImpl->isConsumeOrderly() + && m_pDefaultMQPushConsumerImpl->messageModel() == CLUSTERING) + { + if (pq.getLockConsume().TryLock(1000)) + { + try + { + this->unlock(mq, true); + } + catch (std::exception& e) + { + RMQ_ERROR("removeUnnecessaryMessageQueue Exception: %s", e.what()); + } + pq.getLockConsume().Unlock(); + } + else + { + RMQ_WARN("[WRONG]mq is consuming, so can not unlock it, MQ:%s, maybe hanged for a while, times:{%lld}", + mq.toString().c_str(), + pq.getTryUnlockTimes()); + + pq.incTryUnlockTimes(); + } + + return false; + } + + return true; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h new file mode 100755 index 0000000..0aa2b0e --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h @@ -0,0 +1,55 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __REBALANCEPUSHIMPL_H__ +#define __REBALANCEPUSHIMPL_H__ + +#include "RebalanceImpl.h" + +namespace rmq +{ +class DefaultMQPushConsumerImpl; + +class RebalancePushImpl : public RebalanceImpl +{ +public: + RebalancePushImpl(DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl); + + RebalancePushImpl(const std::string &consumerGroup, + MessageModel messageModel, + AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy, + MQClientFactory *pMQClientFactory, + DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl); + + void dispatchPullRequest(std::list<PullRequest *> &pullRequestList); + long long computePullFromWhere(MessageQueue &mq); + void messageQueueChanged(const std::string &topic, + std::set<MessageQueue> &mqAll, + std::set<MessageQueue> &mqDivided); + bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq); + + + ConsumeType consumeType() + { + return CONSUME_PASSIVELY; + }; + +private: + DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceService.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp new file mode 100644 index 0000000..013fefb --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp @@ -0,0 +1,55 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "RebalanceService.h" +#include "MQClientFactory.h" + +namespace rmq +{ + +long RebalanceService::s_WaitInterval = 1000 * 10; + +RebalanceService::RebalanceService(MQClientFactory* pMQClientFactory) + : ServiceThread("RebalanceService"), + m_pMQClientFactory(pMQClientFactory) +{ +} + + +RebalanceService::~RebalanceService() +{ + +} + +void RebalanceService::Run() +{ + RMQ_INFO("%s service started", getServiceName().c_str()); + + while (!m_stoped) + { + waitForRunning(s_WaitInterval); + m_pMQClientFactory->doRebalance(); + } + + RMQ_INFO("%s service end", getServiceName().c_str()); +} + +std::string RebalanceService::getServiceName() +{ + return "RebalanceService"; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceService.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.h b/rocketmq-client4cpp/src/consumer/RebalanceService.h new file mode 100755 index 0000000..ef4d746 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RebalanceService.h @@ -0,0 +1,44 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __REBALANCESERVICE_H__ +#define __REBALANCESERVICE_H__ + +#include "ServiceThread.h" + +namespace rmq +{ + class MQClientFactory; + + /** + * Rebalance service + * + */ + class RebalanceService : public ServiceThread + { + public: + RebalanceService(MQClientFactory* pMQClientFactory); + ~RebalanceService(); + + void Run(); + std::string getServiceName(); + + private: + MQClientFactory* m_pMQClientFactory; + static long s_WaitInterval; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp new file mode 100755 index 0000000..1c4fd23 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp @@ -0,0 +1,266 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "RemoteBrokerOffsetStore.h" +#include "MQClientFactory.h" +#include "ScopedLock.h" +#include "MQClientException.h" +#include "CommandCustomHeader.h" +#include "MQClientAPIImpl.h" + +namespace rmq +{ + +RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName) +{ + m_pMQClientFactory = pMQClientFactory; + m_groupName = groupName; +} + +void RemoteBrokerOffsetStore::load() +{ + +} + +void RemoteBrokerOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly) +{ + kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); + typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); + if (it == m_offsetTable.end()) + { + m_offsetTable[mq] = offset; + it = m_offsetTable.find(mq); + } + + kpr::AtomicLong& offsetOld = it->second; + if (increaseOnly) + { + MixAll::compareAndIncreaseOnly(offsetOld, offset); + } + else + { + offsetOld.set(offset); + } +} + +long long RemoteBrokerOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type) +{ + RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type); + switch (type) + { + case MEMORY_FIRST_THEN_STORE: + case READ_FROM_MEMORY: + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); + if (it != m_offsetTable.end()) + { + return it->second.get(); + } + else if (READ_FROM_MEMORY == type) + { + RMQ_DEBUG("No offset in memory, MQ:%s", mq.toString().c_str()); + return -1; + } + } + case READ_FROM_STORE: + { + try + { + long long brokerOffset = this->fetchConsumeOffsetFromBroker(mq); + RMQ_DEBUG("fetchConsumeOffsetFromBroker, MQ:%s, brokerOffset:%lld", + mq.toString().c_str(), brokerOffset); + if (brokerOffset >= 0) + { + this->updateOffset(mq, brokerOffset, false); + } + return brokerOffset; + } + // No offset in broker + catch (const MQBrokerException& e) + { + RMQ_WARN("No offset in broker, MQ:%s, exception:%s", mq.toString().c_str(), e.what()); + return -1; + } + catch (const std::exception& e) + { + RMQ_ERROR("fetchConsumeOffsetFromBroker exception, MQ:%s, msg:%s", + mq.toString().c_str(), e.what()); + return -2; + } + catch (...) + { + RMQ_ERROR("fetchConsumeOffsetFromBroker unknow exception, MQ:%s", + mq.toString().c_str()); + return -2; + } + } + default: + break; + } + + return -1; +} + +void RemoteBrokerOffsetStore::persistAll(std::set<MessageQueue>& mqs) +{ + if (mqs.empty()) + { + return; + } + + std::set<MessageQueue> unusedMQ; + long long times = m_storeTimesTotal.fetchAndAdd(1); + + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + for (typeof(m_offsetTable.begin()) it = m_offsetTable.begin(); + it != m_offsetTable.end(); it++) + { + MessageQueue mq = it->first; + kpr::AtomicLong& offset = it->second; + if (mqs.find(mq) != mqs.end()) + { + try + { + this->updateConsumeOffsetToBroker(mq, offset.get()); + if ((times % 12) == 0) + { + RMQ_INFO("updateConsumeOffsetToBroker, Group: {%s} ClientId: {%s} mq:{%s} offset {%llu}", + m_groupName.c_str(), + m_pMQClientFactory->getClientId().c_str(), + mq.toString().c_str(), + offset.get()); + } + } + catch (...) + { + RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str()); + } + } + else + { + unusedMQ.insert(mq); + } + } + + if (!unusedMQ.empty()) + { + for (typeof(unusedMQ.begin()) it = unusedMQ.begin(); it != unusedMQ.end(); it++) + { + m_offsetTable.erase(*it); + RMQ_INFO("remove unused mq, %s, %s", it->toString().c_str(), m_groupName.c_str()); + } + } +} + +void RemoteBrokerOffsetStore::persist(const MessageQueue& mq) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq); + if (it != m_offsetTable.end()) + { + try + { + this->updateConsumeOffsetToBroker(mq, it->second.get()); + RMQ_DEBUG("updateConsumeOffsetToBroker ok, mq=%s, offset=%lld", mq.toString().c_str(), it->second.get()); + } + catch (...) + { + RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str()); + } + } +} + +void RemoteBrokerOffsetStore::removeOffset(const MessageQueue& mq) +{ + kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex); + m_offsetTable.erase(mq); + RMQ_INFO("remove unnecessary messageQueue offset. mq=%s, offsetTableSize=%u", + mq.toString().c_str(), (unsigned)m_offsetTable.size()); +} + + +std::map<MessageQueue, long long> RemoteBrokerOffsetStore::cloneOffsetTable(const std::string& topic) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex); + std::map<MessageQueue, long long> cloneOffsetTable; + RMQ_FOR_EACH(m_offsetTable, it) + { + MessageQueue mq = it->first; + kpr::AtomicLong& offset = it->second; + if (topic == mq.getTopic()) + { + cloneOffsetTable[mq] = offset.get(); + } + } + + return cloneOffsetTable; +} + + +void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset) +{ + FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()); + if (findBrokerResult.brokerAddr.empty()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (!findBrokerResult.brokerAddr.empty()) + { + UpdateConsumerOffsetRequestHeader* requestHeader = new UpdateConsumerOffsetRequestHeader(); + requestHeader->topic = mq.getTopic(); + requestHeader->consumerGroup = this->m_groupName; + requestHeader->queueId = mq.getQueueId(); + requestHeader->commitOffset = offset; + + m_pMQClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway( + findBrokerResult.brokerAddr, requestHeader, 1000 * 5); + } + else + { + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); + } +} + +long long RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MessageQueue& mq) +{ + FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()); + if (findBrokerResult.brokerAddr.empty()) + { + // TODO Here may be heavily overhead for Name Server,need tuning + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName()); + } + + if (!findBrokerResult.brokerAddr.empty()) + { + QueryConsumerOffsetRequestHeader* requestHeader = new QueryConsumerOffsetRequestHeader(); + requestHeader->topic = mq.getTopic(); + requestHeader->consumerGroup = this->m_groupName; + requestHeader->queueId = mq.getQueueId(); + + return m_pMQClientFactory->getMQClientAPIImpl()->queryConsumerOffset( + findBrokerResult.brokerAddr, requestHeader, 1000 * 5); + } + else + { + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h new file mode 100755 index 0000000..b613084 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h @@ -0,0 +1,61 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __REMOTEBROKEROFFSETSTORE_H__ +#define __REMOTEBROKEROFFSETSTORE_H__ + +#include "OffsetStore.h" +#include <map> +#include <string> +#include <set> +#include "MessageQueue.h" +#include "AtomicValue.h" +#include "Mutex.h" + +namespace rmq +{ + class MQClientFactory; + + /** + * offset remote store + * + */ + class RemoteBrokerOffsetStore : public OffsetStore + { + public: + RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName) ; + + void load(); + void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly); + long long readOffset(const MessageQueue& mq, ReadOffsetType type); + void persistAll(std::set<MessageQueue>& mqs); + void persist(const MessageQueue& mq); + void removeOffset(const MessageQueue& mq) ; + std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic); + + private: + void updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset); + long long fetchConsumeOffsetFromBroker(const MessageQueue& mq); + + private: + MQClientFactory* m_pMQClientFactory; + std::string m_groupName; + kpr::AtomicInteger m_storeTimesTotal; + std::map<MessageQueue, kpr::AtomicLong> m_offsetTable; + kpr::RWMutex m_tableMutex; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp new file mode 100755 index 0000000..ed5cf12 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp @@ -0,0 +1,201 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "SubscriptionData.h" + +#include <sstream> +#include "KPRUtil.h" +#include "UtilAll.h" + +namespace rmq +{ + +std::string SubscriptionData::SUB_ALL = "*"; + +SubscriptionData::SubscriptionData() +{ + m_subVersion = KPRUtil::GetCurrentTimeMillis(); +} + +SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString) + : m_topic(topic), + m_subString(subString) +{ + m_subVersion = KPRUtil::GetCurrentTimeMillis(); +} + +std::string SubscriptionData::getTopic()const +{ + return m_topic; +} + +void SubscriptionData::setTopic(const std::string& topic) +{ + m_topic = topic; +} + +std::string SubscriptionData::getSubString() +{ + return m_subString; +} + +void SubscriptionData::setSubString(const std::string& subString) +{ + m_subString = subString; +} + +std::set<std::string>& SubscriptionData::getTagsSet() +{ + return m_tagsSet; +} + +void SubscriptionData::setTagsSet(const std::set<std::string>& tagsSet) +{ + m_tagsSet = tagsSet; +} + +long long SubscriptionData::getSubVersion() +{ + return m_subVersion; +} + +void SubscriptionData::setSubVersion(long long subVersion) +{ + m_subVersion = subVersion; +} + +std::set<int>& SubscriptionData::getCodeSet() +{ + return m_codeSet; +} + +void SubscriptionData::setCodeSet(const std::set<int>& codeSet) +{ + m_codeSet = codeSet; +} + +int SubscriptionData::hashCode() +{ + /* + final int prime = 31; + int result = 1; + result = prime * result + (classFilterMode ? 1231 : 1237); + result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode()); + result = prime * result + ((subString == null) ? 0 : subString.hashCode()); + result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode()); + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + */ + std::stringstream ss; + ss << UtilAll::hashCode(m_codeSet) + << m_subString + << UtilAll::hashCode(m_tagsSet) + << m_topic; + return UtilAll::hashCode(ss.str()); +} + + + + +bool SubscriptionData::operator==(const SubscriptionData& other) +{ + if (m_codeSet != other.m_codeSet) + { + return false; + } + + if (m_subString != other.m_subString) + { + return false; + } + + if (m_subVersion != other.m_subVersion) + { + return false; + } + + if (m_tagsSet != other.m_tagsSet) + { + return false; + } + + if (m_topic != other.m_topic) + { + return false; + } + + return true; +} + +bool SubscriptionData::operator<(const SubscriptionData& other)const +{ + if (m_topic < other.m_topic) + { + return true; + } + else if (m_topic == other.m_topic) + { + if (m_subString < other.m_subString) + { + return true; + } + else + { + return false; + } + } + else + { + return false; + } +} + +void SubscriptionData::toJson(Json::Value& obj) const +{ + obj["classFilterMode"] = false; + obj["topic"] = m_topic; + obj["subString"] = m_subString; + obj["subVersion"] = (long long)m_subVersion; + + Json::Value tagSet(Json::arrayValue); + RMQ_FOR_EACH(m_tagsSet, it) + { + tagSet.append(*it); + } + obj["tagsSet"] = tagSet; + + Json::Value codeSet(Json::arrayValue); + RMQ_FOR_EACH(m_codeSet, it) + { + codeSet.append(*it); + } + obj["codeSet"] = codeSet; +} + +std::string SubscriptionData::toString() const +{ + std::stringstream ss; + ss << "{classFilterMode=" << false + << ",topic=" << m_topic + << ",subString=" << m_subString + << ",subVersion=" << m_subVersion + << ",tagsSet=" << UtilAll::toString(m_tagsSet) + << ",codeSet=" << UtilAll::toString(m_codeSet) + << "}"; + return ss.str(); +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/SubscriptionData.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.h b/rocketmq-client4cpp/src/consumer/SubscriptionData.h new file mode 100755 index 0000000..4796fb7 --- /dev/null +++ b/rocketmq-client4cpp/src/consumer/SubscriptionData.h @@ -0,0 +1,76 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __SUBSCRIPTIONDATA_H__ +#define __SUBSCRIPTIONDATA_H__ + +#include <string> +#include <set> + +#include "RocketMQClient.h" +#include "RemotingSerializable.h" +#include "RefHandle.h" +#include "json/json.h" + +namespace rmq +{ + class SubscriptionData : public kpr::RefCount + { + public: + SubscriptionData(); + SubscriptionData(const std::string& topic, const std::string& subString); + + std::string getTopic()const; + void setTopic(const std::string& topic); + + std::string getSubString(); + void setSubString(const std::string& subString); + + std::set<std::string>& getTagsSet(); + void setTagsSet(const std::set<std::string>& tagsSet); + + long long getSubVersion(); + void setSubVersion(long long subVersion); + + std::set<int>& getCodeSet(); + void setCodeSet(const std::set<int>& codeSet); + + int hashCode(); + void toJson(Json::Value& obj) const; + std::string toString() const; + + bool operator==(const SubscriptionData& other); + bool operator<(const SubscriptionData& other)const; + + public: + static std::string SUB_ALL; + + private: + std::string m_topic; + std::string m_subString; + std::set<std::string> m_tagsSet; + std::set<int> m_codeSet; + long long m_subVersion ; + }; + typedef kpr::RefHandleT<SubscriptionData> SubscriptionDataPtr; + + inline std::ostream& operator<<(std::ostream& os, const SubscriptionData& obj) + { + os << obj.toString(); + return os; + } +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/AUTHORS ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/jsoncpp/AUTHORS b/rocketmq-client4cpp/src/jsoncpp/AUTHORS new file mode 100755 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/LICENSE ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/jsoncpp/LICENSE b/rocketmq-client4cpp/src/jsoncpp/LICENSE new file mode 100755 index 0000000..403d096 --- /dev/null +++ b/rocketmq-client4cpp/src/jsoncpp/LICENSE @@ -0,0 +1 @@ +The json-cpp library and this documentation are in Public Domain. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/README.txt ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/jsoncpp/README.txt b/rocketmq-client4cpp/src/jsoncpp/README.txt new file mode 100755 index 0000000..379d376 --- /dev/null +++ b/rocketmq-client4cpp/src/jsoncpp/README.txt @@ -0,0 +1,117 @@ +* Introduction: + ============= + +JSON (JavaScript Object Notation) is a lightweight data-interchange format. +It can represent integer, real number, string, an ordered sequence of +value, and a collection of name/value pairs. + +JsonCpp is a simple API to manipulate JSON value, handle serialization +and unserialization to string. + +It can also preserve existing comment in unserialization/serialization steps, +making it a convenient format to store user input files. + +Unserialization parsing is user friendly and provides precise error reports. + + +* Building/Testing: + ================= + +JsonCpp uses Scons (http://www.scons.org) as a build system. Scons requires +python to be installed (http://www.python.org). + +You download scons-local distribution from the following url: +http://sourceforge.net/project/showfiles.php?group_id=30337&package_id=67375 + +Unzip it in the directory where you found this README file. scons.py Should be +at the same level as README. + +python scons.py platform=PLTFRM [TARGET] +where PLTFRM may be one of: + suncc Sun C++ (Solaris) + vacpp Visual Age C++ (AIX) + mingw + msvc6 Microsoft Visual Studio 6 service pack 5-6 + msvc70 Microsoft Visual Studio 2002 + msvc71 Microsoft Visual Studio 2003 + msvc80 Microsoft Visual Studio 2005 + linux-gcc Gnu C++ (linux, also reported to work for Mac OS X) + +adding platform is fairly simple. You need to change the Sconstruct file +to do so. + +and TARGET may be: + check: build library and run unit tests. + + +* Running the test manually: + ========================== + +cd test +# This will run the Reader/Writer tests +python runjsontests.py "path to jsontest.exe" + +# This will run the Reader/Writer tests, using JSONChecker test suite +# (http://www.json.org/JSON_checker/). +# Notes: not all tests pass: JsonCpp is too lenient (for example, +# it allows an integer to start with '0'). The goal is to improve +# strict mode parsing to get all tests to pass. +python runjsontests.py --with-json-checker "path to jsontest.exe" + +# This will run the unit tests (mostly Value) +python rununittests.py "path to test_lib_json.exe" + +You can run the tests using valgrind: +python rununittests.py --valgrind "path to test_lib_json.exe" + + +* Building the documentation: + =========================== + +Run the python script doxybuild.py from the top directory: + +python doxybuild.py --open --with-dot + +See doxybuild.py --help for options. + + +* Adding a reader/writer test: + ============================ + +To add a test, you need to create two files in test/data: +- a TESTNAME.json file, that contains the input document in JSON format. +- a TESTNAME.expected file, that contains a flatened representation of + the input document. + +TESTNAME.expected file format: +- each line represents a JSON element of the element tree represented + by the input document. +- each line has two parts: the path to access the element separated from + the element value by '='. Array and object values are always empty + (e.g. represented by either [] or {}). +- element path: '.' represented the root element, and is used to separate + object members. [N] is used to specify the value of an array element + at index N. +See test_complex_01.json and test_complex_01.expected to better understand +element path. + + +* Understanding reader/writer test output: + ======================================== + +When a test is run, output files are generated aside the input test files. +Below is a short description of the content of each file: + +- test_complex_01.json: input JSON document +- test_complex_01.expected: flattened JSON element tree used to check if + parsing was corrected. + +- test_complex_01.actual: flattened JSON element tree produced by + jsontest.exe from reading test_complex_01.json +- test_complex_01.rewrite: JSON document written by jsontest.exe using the + Json::Value parsed from test_complex_01.json and serialized using + Json::StyledWritter. +- test_complex_01.actual-rewrite: flattened JSON element tree produced by + jsontest.exe from reading test_complex_01.rewrite. +test_complex_01.process-output: jsontest.exe output, typically useful to + understand parsing error. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/allocator.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h new file mode 100755 index 0000000..1235a3e --- /dev/null +++ b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h @@ -0,0 +1,96 @@ +// Copyright 2007-2010 Baptiste Lepilleur +// Distributed under MIT license, or public domain if desired and +// recognized in your jurisdiction. +// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE + +#ifndef CPPTL_JSON_ALLOCATOR_H_INCLUDED +#define CPPTL_JSON_ALLOCATOR_H_INCLUDED + +#include <cstring> +#include <memory> + +namespace rmq { +namespace Json { +template<typename T> +class SecureAllocator { + public: + // Type definitions + using value_type = T; + using pointer = T*; + using const_pointer = const T*; + using reference = T&; + using const_reference = const T&; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + + /** + * Allocate memory for N items using the standard allocator. + */ + pointer allocate(size_type n) { + // allocate using "global operator new" + return static_cast<pointer>(::operator new(n * sizeof(T))); + } + + /** + * Release memory which was allocated for N items at pointer P. + * + * The memory block is filled with zeroes before being released. + * The pointer argument is tagged as "volatile" to prevent the + * compiler optimizing out this critical step. + */ + void deallocate(volatile pointer p, size_type n) { + std::memset(p, 0, n * sizeof(T)); + // free using "global operator delete" + ::operator delete(p); + } + + /** + * Construct an item in-place at pointer P. + */ + template<typename... Args> + void construct(pointer p, Args&&... args) { + // construct using "placement new" and "perfect forwarding" + ::new (static_cast<void*>(p)) T(std::forward<Args>(args)...); + } + + size_type max_size() const { + return size_t(-1) / sizeof(T); + } + + pointer address( reference x ) const { + return std::addressof(x); + } + + const_pointer address( const_reference x ) const { + return std::addressof(x); + } + + /** + * Destroy an item in-place at pointer P. + */ + void destroy(pointer p) { + // destroy using "explicit destructor" + p->~T(); + } + + // Boilerplate + SecureAllocator() {} + template<typename U> SecureAllocator(const SecureAllocator<U>&) {} + template<typename U> struct rebind { using other = SecureAllocator<U>; }; +}; + + +template<typename T, typename U> +bool operator==(const SecureAllocator<T>&, const SecureAllocator<U>&) { + return true; +} + +template<typename T, typename U> +bool operator!=(const SecureAllocator<T>&, const SecureAllocator<U>&) { + return false; +} + +} //namespace Json +} //namespace rmq + +#endif // CPPTL_JSON_ALLOCATOR_H_INCLUDED http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/assertions.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h new file mode 100755 index 0000000..dc67b27 --- /dev/null +++ b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h @@ -0,0 +1,54 @@ +// Copyright 2007-2010 Baptiste Lepilleur +// Distributed under MIT license, or public domain if desired and +// recognized in your jurisdiction. +// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE + +#ifndef CPPTL_JSON_ASSERTIONS_H_INCLUDED +#define CPPTL_JSON_ASSERTIONS_H_INCLUDED + +#include <stdlib.h> +#include <sstream> + +#if !defined(JSON_IS_AMALGAMATION) +#include "config.h" +#endif // if !defined(JSON_IS_AMALGAMATION) + +/** It should not be possible for a maliciously designed file to + * cause an abort() or seg-fault, so these macros are used only + * for pre-condition violations and internal logic errors. + */ +#if JSON_USE_EXCEPTION + +// @todo <= add detail about condition in exception +# define JSON_ASSERT(condition) \ + {if (!(condition)) {rmq::Json::throwLogicError( "assert json failed" );}} + +# define JSON_FAIL_MESSAGE(message) \ + { \ + JSONCPP_OSTRINGSTREAM oss; oss << message; \ + rmq::Json::throwLogicError(oss.str()); \ + abort(); \ + } + +#else // JSON_USE_EXCEPTION + +# define JSON_ASSERT(condition) assert(condition) + +// The call to assert() will show the failure message in debug builds. In +// release builds we abort, for a core-dump or debugger. +# define JSON_FAIL_MESSAGE(message) \ + { \ + JSONCPP_OSTRINGSTREAM oss; oss << message; \ + assert(false && oss.str().c_str()); \ + abort(); \ + } + + +#endif + +#define JSON_ASSERT_MESSAGE(condition, message) \ + if (!(condition)) { \ + JSON_FAIL_MESSAGE(message); \ + } + +#endif // CPPTL_JSON_ASSERTIONS_H_INCLUDED http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/autolink.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h new file mode 100644 index 0000000..6fcc8af --- /dev/null +++ b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h @@ -0,0 +1,25 @@ +// Copyright 2007-2010 Baptiste Lepilleur +// Distributed under MIT license, or public domain if desired and +// recognized in your jurisdiction. +// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE + +#ifndef JSON_AUTOLINK_H_INCLUDED +#define JSON_AUTOLINK_H_INCLUDED + +#include "config.h" + +#ifdef JSON_IN_CPPTL +#include <cpptl/cpptl_autolink.h> +#endif + +#if !defined(JSON_NO_AUTOLINK) && !defined(JSON_DLL_BUILD) && \ + !defined(JSON_IN_CPPTL) +#define CPPTL_AUTOLINK_NAME "json" +#undef CPPTL_AUTOLINK_DLL +#ifdef JSON_DLL +#define CPPTL_AUTOLINK_DLL +#endif +#include "autolink.h" +#endif + +#endif // JSON_AUTOLINK_H_INCLUDED
