http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientFactory.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientFactory.cpp b/rocketmq-client4cpp/src/MQClientFactory.cpp new file mode 100755 index 0000000..2b8208b --- /dev/null +++ b/rocketmq-client4cpp/src/MQClientFactory.cpp @@ -0,0 +1,1258 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <math.h> +#include <set> +#include <string> +#include <iostream> +#include <vector> + +#include "MQClientFactory.h" +#include "RemoteClientConfig.h" +#include "ClientRemotingProcessor.h" +#include "MQClientAPIImpl.h" +#include "MQAdminImpl.h" +#include "DefaultMQProducer.h" +#include "PullMessageService.h" +#include "RebalanceService.h" +#include "ScopedLock.h" +#include "KPRUtil.h" +#include "DefaultMQProducerImpl.h" +#include "DefaultMQPushConsumerImpl.h" +#include "MQClientException.h" +#include "MQConsumerInner.h" +#include "MQProducerInner.h" +#include "UtilAll.h" +#include "PermName.h" +#include "MQClientManager.h" +#include "ConsumerStatManage.h" +#include "TopicPublishInfo.h" +#include "MQVersion.h" + +namespace rmq +{ + + +long MQClientFactory::LockTimeoutMillis = 3000; + +MQClientFactory::MQClientFactory(ClientConfig& clientConfig, int factoryIndex, const std::string& clientId) +{ + m_clientConfig = clientConfig; + m_factoryIndex = factoryIndex; + m_pRemoteClientConfig = new RemoteClientConfig(); + m_pRemoteClientConfig->clientCallbackExecutorThreads = clientConfig.getClientCallbackExecutorThreads(); + m_pClientRemotingProcessor = new ClientRemotingProcessor(this); + m_pMQClientAPIImpl = new MQClientAPIImpl(m_clientConfig, *m_pRemoteClientConfig, m_pClientRemotingProcessor); + + if (!m_clientConfig.getNamesrvAddr().empty()) + { + m_pMQClientAPIImpl->updateNameServerAddressList(m_clientConfig.getNamesrvAddr()); + RMQ_INFO("user specified name server address: {%s}", m_clientConfig.getNamesrvAddr().c_str()); + } + + m_clientId = clientId; + + m_pMQAdminImpl = new MQAdminImpl(this); + m_pPullMessageService = new PullMessageService(this); + m_pRebalanceService = new RebalanceService(this); + m_pDefaultMQProducer = new DefaultMQProducer(MixAll::CLIENT_INNER_PRODUCER_GROUP); + m_pDefaultMQProducer->resetClientConfig(clientConfig); + m_bootTimestamp = KPRUtil::GetCurrentTimeMillis(); + + m_pFetchNameServerAddrTask = new ScheduledTask(this, &MQClientFactory::fetchNameServerAddr); + m_pUpdateTopicRouteInfoFromNameServerTask = new ScheduledTask(this, &MQClientFactory::updateTopicRouteInfoFromNameServerTask); + m_pCleanBrokerTask = new ScheduledTask(this, &MQClientFactory::cleanBroker); + m_pPersistAllConsumerOffsetTask = new ScheduledTask(this, &MQClientFactory::persistAllConsumerOffsetTask); + m_pRecordSnapshotPeriodicallyTask = new ScheduledTask(this, &MQClientFactory::recordSnapshotPeriodicallyTask); + m_pLogStatsPeriodicallyTask = new ScheduledTask(this, &MQClientFactory::logStatsPeriodicallyTask); + + m_serviceState = CREATE_JUST; + + RMQ_INFO("created a new client Instance, FactoryIndex: {%d} ClinetID: {%s} Config: {%s} Version: {%s}", + m_factoryIndex, + m_clientId.c_str(), + m_clientConfig.toString().c_str(), + MQVersion::getVersionDesc(MQVersion::s_CurrentVersion)); +} + +MQClientFactory::~MQClientFactory() +{ + delete m_pRemoteClientConfig; + delete m_pClientRemotingProcessor; + delete m_pMQClientAPIImpl; + delete m_pMQAdminImpl; + delete m_pPullMessageService; + delete m_pRebalanceService; + delete m_pDefaultMQProducer; +} + +void MQClientFactory::start() +{ + RMQ_DEBUG("MQClientFactory::start()"); + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + switch (m_serviceState) + { + case CREATE_JUST: + makesureInstanceNameIsOnly(m_clientConfig.getInstanceName()); + + m_serviceState = START_FAILED; + if (m_clientConfig.getNamesrvAddr().empty()) + { + m_clientConfig.setNamesrvAddr(m_pMQClientAPIImpl->fetchNameServerAddr()); + } + + m_pMQClientAPIImpl->start(); + m_timerTaskManager.Init(5, 1000); + startScheduledTask(); + m_pPullMessageService->Start(); + m_pRebalanceService->Start(); + m_pDefaultMQProducer->getDefaultMQProducerImpl()->start(false); + + RMQ_INFO("the client factory [%s] start OK", m_clientId.c_str()); + m_serviceState = RUNNING; + break; + case RUNNING: + RMQ_WARN("MQClientFactory is already running."); + break; + case SHUTDOWN_ALREADY: + RMQ_ERROR("MQClientFactory should have already been shutted down"); + break; + case START_FAILED: + RMQ_ERROR("MQClientFactory started failed."); + THROW_MQEXCEPTION(MQClientException, "The Factory object start failed", -1); + default: + break; + } +} + + +void MQClientFactory::shutdown() +{ + RMQ_DEBUG("MQClientFactory::shutdown()"); + // Consumer + if (!m_consumerTable.empty()) + { + return; + } + + // AdminExt + if (!m_adminExtTable.empty()) + { + return; + } + + // Producer + if (m_producerTable.size() > 1) + { + return; + } + + RMQ_DEBUG("MQClientFactory::shutdown_begin"); + { + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + switch (m_serviceState) + { + case CREATE_JUST: + break; + case RUNNING: + m_pDefaultMQProducer->getDefaultMQProducerImpl()->shutdown(false); + + for (int i = 0; i < 6; i++) + { + m_timerTaskManager.UnRegisterTimer(m_scheduledTaskIds[i]); + } + + m_timerTaskManager.Stop(); + + m_pPullMessageService->stop(); + m_pPullMessageService->Join(); + + m_pMQClientAPIImpl->shutdown(); + m_pRebalanceService->stop(); + m_pRebalanceService->Join(); + + //closesocket(m_datagramSocket); + + MQClientManager::getInstance()->removeClientFactory(m_clientId); + m_serviceState = SHUTDOWN_ALREADY; + break; + case SHUTDOWN_ALREADY: + break; + default: + break; + } + } +} + + +void MQClientFactory::sendHeartbeatToAllBrokerWithLock() +{ + RMQ_DEBUG("TryLock m_lockHeartbeat: %p", &m_lockHeartbeat); + if (m_lockHeartbeat.TryLock()) + { + try + { + RMQ_DEBUG("TryLock m_lockHeartbeat ok"); + sendHeartbeatToAllBroker(); + } + catch (...) + { + RMQ_ERROR("sendHeartbeatToAllBroker exception"); + } + m_lockHeartbeat.Unlock(); + } + else + { + RMQ_WARN("TryLock heartBeat fail"); + } +} + +void MQClientFactory::updateTopicRouteInfoFromNameServer() +{ + std::set<std::string> topicList; + + // Consumer + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* inner = it->second; + std::set<SubscriptionData> subList = inner->subscriptions(); + std::set<SubscriptionData>::iterator it1 = subList.begin(); + for (; it1 != subList.end(); it1++) + { + topicList.insert((*it1).getTopic()); + } + } + } + + // Producer + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock); + std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin(); + for (; it != m_producerTable.end(); it++) + { + MQProducerInner* inner = it->second; + std::set<std::string> pubList = inner->getPublishTopicList(); + topicList.insert(pubList.begin(), pubList.end()); + } + } + + std::set<std::string>::iterator it2 = topicList.begin(); + for (; it2 != topicList.end(); it2++) + { + updateTopicRouteInfoFromNameServer(*it2); + } +} + +bool MQClientFactory::updateTopicRouteInfoFromNameServer(const std::string& topic) +{ + return updateTopicRouteInfoFromNameServer(topic, false, NULL); +} + +bool MQClientFactory::updateTopicRouteInfoFromNameServer(const std::string& topic, + bool isDefault, + DefaultMQProducer* pDefaultMQProducer) +{ + RMQ_DEBUG("TryLock m_lockNamesrv: 0x%p, topic: [%s]", &m_lockNamesrv, topic.c_str()); + if (m_lockNamesrv.TryLock(MQClientFactory::LockTimeoutMillis)) + { + RMQ_DEBUG("TryLock m_lockNamesrv ok"); + TopicRouteDataPtr topicRouteData = NULL; + try + { + if (isDefault && pDefaultMQProducer != NULL) + { + topicRouteData = + m_pMQClientAPIImpl->getDefaultTopicRouteInfoFromNameServer( + pDefaultMQProducer->getCreateTopicKey(), 1000 * 3); + if (topicRouteData.ptr() != NULL) + { + std::list<QueueData> dataList = topicRouteData->getQueueDatas(); + + std::list<QueueData>::iterator it = dataList.begin(); + for (; it != dataList.end(); it++) + { + QueueData data = *it; + + int queueNums = + std::min<int>(pDefaultMQProducer->getDefaultTopicQueueNums(), + data.readQueueNums); + data.readQueueNums = (queueNums); + data.writeQueueNums = (queueNums); + } + } + } + else + { + topicRouteData = + m_pMQClientAPIImpl->getTopicRouteInfoFromNameServer(topic, 1000 * 3); + } + + if (topicRouteData.ptr() != NULL) + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_topicRouteTableLock); + std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.find(topic); + bool changed = false; + + if (it != m_topicRouteTable.end()) + { + changed = topicRouteDataIsChange(it->second, *topicRouteData); + if (!changed) + { + changed = isNeedUpdateTopicRouteInfo(topic); + if (changed) + { + RMQ_INFO("the topic[{%s}] route info changed, old[{%s}] ,new[{%s}]", + topic.c_str(), it->second.toString().c_str(), + topicRouteData->toString().c_str()); + } + } + } + else + { + changed = true; + } + + if (changed) + { + TopicRouteData cloneTopicRouteData = *topicRouteData; + + std::list<BrokerData> dataList = topicRouteData->getBrokerDatas(); + + std::list<BrokerData>::iterator it = dataList.begin(); + for (; it != dataList.end(); it++) + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + m_brokerAddrTable[(*it).brokerName] = (*it).brokerAddrs; + } + + { + TopicPublishInfoPtr publishInfo = + topicRouteData2TopicPublishInfo(topic, *topicRouteData); + publishInfo->setHaveTopicRouterInfo(true); + + kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock); + std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin(); + for (; it != m_producerTable.end(); it++) + { + MQProducerInner* impl = it->second; + if (impl) + { + impl->updateTopicPublishInfo(topic, *publishInfo); + } + } + } + + { + std::set<MessageQueue>* subscribeInfo = + topicRouteData2TopicSubscribeInfo(topic, *topicRouteData); + + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* impl = it->second; + if (impl) + { + impl->updateTopicSubscribeInfo(topic, *subscribeInfo); + } + } + delete subscribeInfo; + } + + m_topicRouteTable[topic] = cloneTopicRouteData; + m_lockNamesrv.Unlock(); + RMQ_DEBUG("UnLock m_lockNamesrv ok"); + + RMQ_INFO("topicRouteTable.put[%s] = TopicRouteData[%s]", + topic.c_str(), cloneTopicRouteData.toString().c_str()); + return true; + } + } + else + { + //TODO log? + RMQ_WARN("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {%s}", + topic.c_str()); + } + } + catch (const std::exception& e) + { + if (!(topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) == 0) && topic != MixAll::DEFAULT_TOPIC) + { + RMQ_WARN("updateTopicRouteInfoFromNameServer Exception: %s", e.what()); + } + } + catch (...) + { + RMQ_WARN("updateTopicRouteInfoFromNameServer unknow Exception"); + } + + m_lockNamesrv.Unlock(); + RMQ_DEBUG("UnLock m_lockNamesrv ok"); + } + else + { + RMQ_WARN("TryLock m_lockNamesrv timeout %ldms", MQClientFactory::LockTimeoutMillis); + } + + return false; +} + +TopicPublishInfo* MQClientFactory::topicRouteData2TopicPublishInfo(const std::string& topic, + TopicRouteData& route) +{ + TopicPublishInfo* info = new TopicPublishInfo(); + if (!route.getOrderTopicConf().empty()) + { + std::vector<std::string> brokers; + UtilAll::Split(brokers, route.getOrderTopicConf(), ";"); + for (size_t i = 0; i < brokers.size(); i++) + { + std::vector<std::string> item; + UtilAll::Split(item, brokers[i], ":"); + int nums = atoi(item[1].c_str()); + for (int i = 0; i < nums; i++) + { + MessageQueue mq(topic, item[0], i); + info->getMessageQueueList().push_back(mq); + } + } + + info->setOrderTopic(true); + } + else + { + std::list<QueueData> qds = route.getQueueDatas(); + qds.sort(); + std::list<QueueData>::iterator it = qds.begin(); + for (; it != qds.end(); it++) + { + QueueData& qd = (*it); + if (PermName::isWriteable(qd.perm)) + { + bool find = false; + BrokerData brokerData; + std::list<BrokerData> bds = route.getBrokerDatas(); + std::list<BrokerData>::iterator it1 = bds.begin(); + + for (; it1 != bds.end(); it1++) + { + BrokerData& bd = (*it1); + if (bd.brokerName == qd.brokerName) + { + brokerData = bd; + find = true; + break; + } + } + + if (!find) + { + continue; + } + + if (brokerData.brokerAddrs.find(MixAll::MASTER_ID) == brokerData.brokerAddrs.end()) + { + continue; + } + + for (int i = 0; i < qd.writeQueueNums; i++) + { + MessageQueue mq(topic, qd.brokerName, i); + info->getMessageQueueList().push_back(mq); + } + } + } + + info->setOrderTopic(false); + } + + return info; +} + +std::set<MessageQueue>* MQClientFactory::topicRouteData2TopicSubscribeInfo(const std::string& topic, + TopicRouteData& route) +{ + std::set<MessageQueue>* mqList = new std::set<MessageQueue>(); + std::list<QueueData> qds = route.getQueueDatas(); + std::list<QueueData>::iterator it = qds.begin(); + for (; it != qds.end(); it++) + { + QueueData& qd = (*it); + if (PermName::isReadable(qd.perm)) + { + for (int i = 0; i < qd.readQueueNums; i++) + { + MessageQueue mq(topic, qd.brokerName, i); + mqList->insert(mq); + } + } + } + + return mqList; +} + +bool MQClientFactory::registerConsumer(const std::string& group, MQConsumerInner* pConsumer) +{ + if (group.empty() || pConsumer == NULL) + { + return false; + } + + kpr::ScopedWLock<kpr::RWMutex> lock(m_consumerTableLock); + if (m_consumerTable.find(group) != m_consumerTable.end()) + { + return false; + } + m_consumerTable[group] = pConsumer; + + return true; +} + +void MQClientFactory::unregisterConsumer(const std::string& group) +{ + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_consumerTableLock); + m_consumerTable.erase(group); + } + unregisterClientWithLock("", group); +} + +bool MQClientFactory::registerProducer(const std::string& group, DefaultMQProducerImpl* pProducer) +{ + if (group.empty() || pProducer == NULL) + { + return false; + } + + kpr::ScopedWLock<kpr::RWMutex> lock(m_producerTableLock); + if (m_producerTable.find(group) != m_producerTable.end()) + { + return false; + } + m_producerTable[group] = pProducer; + + return true; +} + +void MQClientFactory::unregisterProducer(const std::string& group) +{ + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_producerTableLock); + m_producerTable.erase(group); + } + unregisterClientWithLock(group, ""); +} + +bool MQClientFactory::registerAdminExt(const std::string& group, MQAdminExtInner* pAdmin) +{ + if (group.empty() || pAdmin == NULL) + { + return false; + } + + kpr::ScopedWLock<kpr::RWMutex> lock(m_adminExtTableLock); + if (m_adminExtTable.find(group) != m_adminExtTable.end()) + { + return false; + } + m_adminExtTable[group] = pAdmin; + + return true; +} + +void MQClientFactory::unregisterAdminExt(const std::string& group) +{ + kpr::ScopedWLock<kpr::RWMutex> lock(m_adminExtTableLock); + m_adminExtTable.erase(group); +} + +void MQClientFactory::rebalanceImmediately() +{ + m_pRebalanceService->wakeup(); +} + +void MQClientFactory::doRebalance() +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* impl = it->second; + if (impl != NULL) + { + try + { + impl->doRebalance(); + } + catch (std::exception& e) + { + RMQ_ERROR("doRebalance exception, %s", e.what()); + } + catch (...) + { + RMQ_ERROR("doRebalance unknow exception"); + } + } + } +} + +MQProducerInner* MQClientFactory::selectProducer(const std::string& group) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock); + std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.find(group); + if (it != m_producerTable.end()) + { + return it->second; + } + + return NULL; +} + +MQConsumerInner* MQClientFactory::selectConsumer(const std::string& group) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.find(group); + if (it != m_consumerTable.end()) + { + return it->second; + } + + return NULL; +} + +FindBrokerResult MQClientFactory::findBrokerAddressInAdmin(const std::string& brokerName) +{ + //TODO + FindBrokerResult result; + std::string brokerAddr; + bool slave = false; + bool found = false; + + kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + typeof(m_brokerAddrTable.begin()) it = m_brokerAddrTable.find(brokerName); + if (it != m_brokerAddrTable.end()) + { + // TODO more slave + typeof(it->second.begin()) it1 = it->second.begin(); + for (; it1 != it->second.end(); it1++) + { + int brockerId = it1->first; + brokerAddr = it1->second; + if (!brokerAddr.empty()) + { + found = true; + if (MixAll::MASTER_ID == brockerId) + { + slave = false; + } + else + { + slave = true; + } + break; + } + } + } + + if (found) + { + result.brokerAddr = brokerAddr; + result.slave = slave; + } + + return result; +} + +std::string MQClientFactory::findBrokerAddressInPublish(const std::string& brokerName) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.find(brokerName); + if (it != m_brokerAddrTable.end()) + { + std::map<int, std::string>::iterator it1 = it->second.find(MixAll::MASTER_ID); + if (it1 != it->second.end()) + { + return it1->second; + } + } + + return ""; +} + +FindBrokerResult MQClientFactory::findBrokerAddressInSubscribe(const std::string& brokerName, + long brokerId, + bool onlyThisBroker) +{ + std::string brokerAddr = ""; + bool slave = false; + bool found = false; + + kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.find(brokerName); + if (it != m_brokerAddrTable.end()) + { + std::map<int, std::string>::iterator it1 = it->second.find(brokerId); + if (it1 != it->second.end()) + { + brokerAddr = it1->second; + slave = (brokerId != MixAll::MASTER_ID); + found = true; + } + else + { + it1 = it->second.begin(); + brokerAddr = it1->second; + slave = (brokerId != MixAll::MASTER_ID); + found = true; + } + } + + FindBrokerResult result; + result.brokerAddr = brokerAddr; + result.slave = slave; + + return result; +} + +std::list<std::string> MQClientFactory::findConsumerIdList(const std::string& topic, const std::string& group) +{ + std::string brokerAddr = findBrokerAddrByTopic(topic); + + if (brokerAddr.empty()) + { + updateTopicRouteInfoFromNameServer(topic); + brokerAddr = findBrokerAddrByTopic(topic); + } + + if (!brokerAddr.empty()) + { + try + { + return m_pMQClientAPIImpl->getConsumerIdListByGroup(brokerAddr, group, 3000); + } + catch (...) + { + RMQ_WARN("getConsumerIdListByGroup exception, %s, %s", brokerAddr.c_str(), group.c_str()); + } + } + + std::list<std::string> ids; + + return ids; +} + +std::string MQClientFactory::findBrokerAddrByTopic(const std::string& topic) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicRouteTableLock); + + std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.find(topic); + if (it != m_topicRouteTable.end()) + { + const std::list<BrokerData>& brokers = it->second.getBrokerDatas(); + + if (!brokers.empty()) + { + BrokerData bd = brokers.front(); + return TopicRouteData::selectBrokerAddr(bd); + } + } + + return ""; +} + +TopicRouteData MQClientFactory::getAnExistTopicRouteData(const std::string& topic) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicRouteTableLock); + + std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.find(topic); + if (it != m_topicRouteTable.end()) + { + return it->second; + } + + TopicRouteData data; + return data; +} + +MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() +{ + return m_pMQClientAPIImpl; +} + +MQAdminImpl* MQClientFactory::getMQAdminImpl() +{ + return m_pMQAdminImpl; +} + +std::string MQClientFactory::getClientId() +{ + return m_clientId; +} + +long long MQClientFactory::getBootTimestamp() +{ + return m_bootTimestamp; +} + +PullMessageService* MQClientFactory::getPullMessageService() +{ + return m_pPullMessageService; +} + + +DefaultMQProducer* MQClientFactory::getDefaultMQProducer() +{ + return m_pDefaultMQProducer; +} + +void MQClientFactory::sendHeartbeatToAllBroker() +{ + RMQ_DEBUG("sendHeartbeatToAllBroker begin"); + + HeartbeatData heartbeatData; + this->prepareHeartbeatData(heartbeatData); + + bool producerEmpty = heartbeatData.getProducerDataSet().empty(); + bool consumerEmpty = heartbeatData.getConsumerDataSet().empty(); + if (producerEmpty && consumerEmpty) + { + RMQ_ERROR("sending hearbeat, but no consumer and no producer"); + return; + } + + RMQ_DEBUG("clientId=%s, m_brokerAddrTable=%u", heartbeatData.getClientID().c_str(), (unsigned)m_brokerAddrTable.size()); + + kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.begin(); + for (; it != m_brokerAddrTable.end(); it++) + { + std::map<int, std::string>::iterator it1 = it->second.begin(); + for (; it1 != it->second.end(); it1++) + { + std::string& addr = it1->second; + if (!addr.empty()) + { + if (consumerEmpty) + { + if (it1->first != MixAll::MASTER_ID) + { + continue; + } + } + + try + { + m_pMQClientAPIImpl->sendHearbeat(addr, &heartbeatData, 3000); + RMQ_INFO("send heartbeat to broker[{%s} {%d} {%s}] success", + it->first.c_str(), it1->first, addr.c_str()); + RMQ_INFO("HeartbeatData %s", heartbeatData.toString().c_str()); + } + catch (...) + { + RMQ_ERROR("send heart beat to broker exception"); + } + } + } + } + + RMQ_DEBUG("sendHeartbeatToAllBroker end"); +} + +void MQClientFactory::prepareHeartbeatData(HeartbeatData& heartbeatData) +{ + // clientID + heartbeatData.setClientID(m_clientId); + + // Consumer + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* inner = it->second; + if (inner) + { + ConsumerData consumerData; + consumerData.groupName = inner->groupName(); + consumerData.consumeType = inner->consumeType(); + consumerData.messageModel = inner->messageModel(); + consumerData.consumeFromWhere = inner->consumeFromWhere(); + consumerData.subscriptionDataSet = inner->subscriptions(); + + heartbeatData.getConsumerDataSet().insert(consumerData); + } + } + } + + // Producer + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock); + std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin(); + for (; it != m_producerTable.end(); it++) + { + MQProducerInner* inner = it->second; + if (inner) + { + ProducerData producerData; + producerData.groupName = (it->first); + + heartbeatData.getProducerDataSet().insert(producerData); + } + } + } + + return; +} + +void MQClientFactory::makesureInstanceNameIsOnly(const std::string& instanceName) +{ + //TODO +} + + +void MQClientFactory::fetchNameServerAddr() +{ + //1000 * 10, 1000 * 60 * 2 + try + { + RMQ_DEBUG("Task: fetchNameServerAddr"); + m_pMQClientAPIImpl->fetchNameServerAddr(); + } + catch (...) + { + RMQ_ERROR("Task: fetchNameServerAddr exception"); + } +} + +void MQClientFactory::updateTopicRouteInfoFromNameServerTask() +{ + //10, 1000 * 30, m_clientConfig.getPollNameServerInteval() + try + { + RMQ_DEBUG("Task: updateTopicRouteInfoFromNameServerTask"); + updateTopicRouteInfoFromNameServer(); + } + catch (...) + { + RMQ_ERROR("Task: fetchNameServerAddr exception"); + } +} + +void MQClientFactory::cleanBroker() +{ + //1000, 1000 * 30, m_clientConfig.getHeartbeatBrokerInterval() + try + { + RMQ_DEBUG("Task: cleanBroker"); + cleanOfflineBroker(); + sendHeartbeatToAllBrokerWithLock(); + } + catch (...) + { + RMQ_ERROR("Task: cleanBroker exception"); + } +} + +void MQClientFactory::persistAllConsumerOffsetTask() +{ + //1000 * 10, 1000 * 5, m_clientConfig.getPersistConsumerOffsetInterval() + try + { + RMQ_DEBUG("Task: persistAllConsumerOffsetTask"); + persistAllConsumerOffset(); + } + catch (...) + { + RMQ_ERROR("Task: persistAllConsumerOffsetTask exception"); + } +} + +void MQClientFactory::recordSnapshotPeriodicallyTask() +{ + // 1000 * 10, 1000, + try + { + //RMQ_DEBUG("Task: recordSnapshotPeriodicallyTask"); + recordSnapshotPeriodically(); + } + catch (...) + { + RMQ_ERROR("Task: recordSnapshotPeriodically exception"); + } +} + +void MQClientFactory::logStatsPeriodicallyTask() +{ + // 1000 * 10, 1000 * 60 + try + { + RMQ_DEBUG("Task: logStatsPeriodicallyTask"); + logStatsPeriodically(); + } + catch (...) + { + RMQ_ERROR("Task: logStatsPeriodicallyTask exception"); + } +} + +void MQClientFactory::startScheduledTask() +{ + m_scheduledTaskIds[0] = m_timerTaskManager.RegisterTimer(1000 * 10, 1000 * 60 * 2, m_pFetchNameServerAddrTask); + + m_scheduledTaskIds[1] = m_timerTaskManager.RegisterTimer(10, m_clientConfig.getPollNameServerInterval(), m_pUpdateTopicRouteInfoFromNameServerTask); + + m_scheduledTaskIds[2] = m_timerTaskManager.RegisterTimer(1000, m_clientConfig.getHeartbeatBrokerInterval(), m_pCleanBrokerTask); + + m_scheduledTaskIds[3] = m_timerTaskManager.RegisterTimer(1000 * 10, m_clientConfig.getPersistConsumerOffsetInterval(), m_pPersistAllConsumerOffsetTask); + + m_scheduledTaskIds[4] = m_timerTaskManager.RegisterTimer(1000 * 10, 1000, m_pRecordSnapshotPeriodicallyTask); + m_scheduledTaskIds[5] = m_timerTaskManager.RegisterTimer(1000 * 10, 1000 * 60, m_pLogStatsPeriodicallyTask); +} + +void MQClientFactory::cleanOfflineBroker() +{ + RMQ_DEBUG("TryLock m_lockNamesrv: 0x%p", &m_lockNamesrv); + if (m_lockNamesrv.TryLock(MQClientFactory::LockTimeoutMillis)) + { + RMQ_DEBUG("TryLock m_lockNamesrv ok"); + std::map<std::string, std::map<int, std::string> > updatedTable; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.begin(); + + for (; it != m_brokerAddrTable.end(); it++) + { + std::map<int, std::string> cloneTable = it->second; + + std::map<int, std::string>::iterator it1 = cloneTable.begin(); + + for (; it1 != cloneTable.end();) + { + std::string& addr = it1->second; + if (!isBrokerAddrExistInTopicRouteTable(addr)) + { + std::map<int, std::string>::iterator itTmp = it1; + it1++; + cloneTable.erase(itTmp); + continue; + } + + it1++; + } + + if (!cloneTable.empty()) + { + updatedTable[it->first] = cloneTable; + } + } + } + + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + m_brokerAddrTable.clear(); + m_brokerAddrTable = updatedTable; + } + + m_lockNamesrv.Unlock(); + RMQ_DEBUG("UnLock m_lockNamesrv ok"); + } + else + { + RMQ_DEBUG("TryLock m_lockNamesrv fail"); + } +} + +bool MQClientFactory::isBrokerAddrExistInTopicRouteTable(const std::string& addr) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicRouteTableLock); + + std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.begin(); + for (; it != m_topicRouteTable.end(); it++) + { + const std::list<BrokerData>& brokers = it->second.getBrokerDatas(); + std::list<BrokerData>::const_iterator it1 = brokers.begin(); + + for (; it1 != brokers.end(); it1++) + { + std::map<int, std::string>::const_iterator it2 = (*it1).brokerAddrs.begin(); + for (; it2 != (*it1).brokerAddrs.end(); it2++) + { + if (it2->second.find(addr) != std::string::npos) + { + return true; + } + } + } + } + + return false; +} + +void MQClientFactory::recordSnapshotPeriodically() +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* inner = it->second; + if (inner) + { + DefaultMQPushConsumerImpl* consumer = dynamic_cast<DefaultMQPushConsumerImpl*>(inner); + if (consumer) + { + consumer->getConsumerStatManager()->recordSnapshotPeriodically(); + } + } + } +} + +void MQClientFactory::logStatsPeriodically() +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* inner = it->second; + if (inner) + { + DefaultMQPushConsumerImpl* consumer = dynamic_cast<DefaultMQPushConsumerImpl*>(inner); + if (consumer) + { + std::string group = it->first; + consumer->getConsumerStatManager()->logStatsPeriodically(group, m_clientId); + } + } + } +} + +void MQClientFactory::persistAllConsumerOffset() +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + RMQ_DEBUG("persistAllConsumerOffset, m_consumerTable.size=%u", (unsigned)m_consumerTable.size()); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* inner = it->second; + if (inner) + { + inner->persistConsumerOffset(); + } + } +} + +bool MQClientFactory::topicRouteDataIsChange(TopicRouteData& olddata, TopicRouteData& nowdata) +{ + TopicRouteData old = olddata; + TopicRouteData now = nowdata; + + old.getQueueDatas().sort(); + old.getBrokerDatas().sort(); + now.getQueueDatas().sort(); + now.getBrokerDatas().sort(); + + return !(old == now); + +} + +bool MQClientFactory::isNeedUpdateTopicRouteInfo(const std::string& topic) +{ + bool result = false; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock); + std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin(); + for (; it != m_producerTable.end(); it++) + { + MQProducerInner* inner = it->second; + if (inner) + { + result = inner->isPublishTopicNeedUpdate(topic); + } + } + } + + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock); + std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin(); + for (; it != m_consumerTable.end(); it++) + { + MQConsumerInner* inner = it->second; + if (inner) + { + result = inner->isSubscribeTopicNeedUpdate(topic); + } + } + } + + return result; +} + +void MQClientFactory::unregisterClientWithLock(const std::string& producerGroup, const std::string& consumerGroup) +{ + RMQ_DEBUG("TryLock m_lockHeartbeat: 0x%p", &m_lockHeartbeat); + if (m_lockHeartbeat.TryLock(MQClientFactory::LockTimeoutMillis)) + { + try + { + RMQ_DEBUG("TryLock m_lockHeartbeat ok"); + unregisterClient(producerGroup, consumerGroup); + } + catch (...) + { + RMQ_ERROR("unregisterClientWithLock exception, %s %s", + producerGroup.c_str(), consumerGroup.c_str()); + } + m_lockHeartbeat.Unlock(); + RMQ_DEBUG("Unlock m_lockHeartbeat ok"); + } + else + { + RMQ_WARN("TryLock m_lockHeartbeat fail"); + } +} + +void MQClientFactory::unregisterClient(const std::string& producerGroup, const std::string& consumerGroup) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock); + std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.begin(); + for (; it != m_brokerAddrTable.end(); it++) + { + std::map<int, std::string>::iterator it1 = it->second.begin(); + + for (; it1 != it->second.end(); it1++) + { + std::string& addr = it1->second; + + if (!addr.empty()) + { + try + { + m_pMQClientAPIImpl->unregisterClient(addr, m_clientId, producerGroup, + consumerGroup, 3000); + } + catch (...) + { + RMQ_ERROR("unregister client exception from broker: %s", addr.c_str()); + } + } + } + } +} + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientFactory.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientFactory.h b/rocketmq-client4cpp/src/MQClientFactory.h new file mode 100755 index 0000000..8f56a27 --- /dev/null +++ b/rocketmq-client4cpp/src/MQClientFactory.h @@ -0,0 +1,214 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MQCLIENTFACTORY_H__ +#define __MQCLIENTFACTORY_H__ + +#include <set> +#include <string> +#include <list> + +#include "SocketUtil.h" +#include "TopicRouteData.h" +#include "FindBrokerResult.h" +#include "ClientConfig.h" +#include "Mutex.h" +#include "ServiceState.h" +#include "TimerTaskManager.h" + +namespace rmq +{ + class ClientConfig; + class MessageQueue; + class MQAdminExtInner; + class MQClientAPIImpl; + class MQAdminImpl; + class PullMessageService; + class HeartbeatData; + class RemoteClientConfig; + class ClientRemotingProcessor; + class RebalanceService; + class DefaultMQProducer; + class TopicPublishInfo; + class MQProducerInner; + class MQConsumerInner; + class DefaultMQProducerImpl; + + class MQClientFactory + { + public: + MQClientFactory(ClientConfig& clientConfig, int factoryIndex, const std::string& clientId); + ~MQClientFactory(); + + void start(); + void shutdown(); + void sendHeartbeatToAllBrokerWithLock(); + void updateTopicRouteInfoFromNameServer(); + bool updateTopicRouteInfoFromNameServer(const std::string& topic); + + bool updateTopicRouteInfoFromNameServer(const std::string& topic, bool isDefault, + DefaultMQProducer* pDefaultMQProducer); + + static TopicPublishInfo* topicRouteData2TopicPublishInfo(const std::string& topic, + TopicRouteData& route); + + static std::set<MessageQueue>* topicRouteData2TopicSubscribeInfo(const std::string& topic, + TopicRouteData& route); + + bool registerConsumer(const std::string& group, MQConsumerInner* pConsumer); + void unregisterConsumer(const std::string& group); + + bool registerProducer(const std::string& group, DefaultMQProducerImpl* pProducer); + void unregisterProducer(const std::string& group); + + bool registerAdminExt(const std::string& group, MQAdminExtInner* pAdmin); + void unregisterAdminExt(const std::string& group); + + void rebalanceImmediately(); + void doRebalance(); + + MQProducerInner* selectProducer(const std::string& group); + MQConsumerInner* selectConsumer(const std::string& group); + + FindBrokerResult findBrokerAddressInAdmin(const std::string& brokerName); + std::string findBrokerAddressInPublish(const std::string& brokerName); + FindBrokerResult findBrokerAddressInSubscribe(// + const std::string& brokerName,// + long brokerId,// + bool onlyThisBroker); + + std::list<std::string> findConsumerIdList(const std::string& topic, const std::string& group); + std::string findBrokerAddrByTopic(const std::string& topic); + TopicRouteData getAnExistTopicRouteData(const std::string& topic); + MQClientAPIImpl* getMQClientAPIImpl(); + MQAdminImpl* getMQAdminImpl(); + std::string getClientId(); + long long getBootTimestamp(); + PullMessageService* getPullMessageService(); + DefaultMQProducer* getDefaultMQProducer(); + + private: + void sendHeartbeatToAllBroker(); + //HeartbeatData* prepareHeartbeatData(); + void prepareHeartbeatData(HeartbeatData& heartbeatData); + + void makesureInstanceNameIsOnly(const std::string& instanceName); + void startScheduledTask(); + + + void cleanOfflineBroker(); + bool isBrokerAddrExistInTopicRouteTable(const std::string& addr); + void recordSnapshotPeriodically(); + void logStatsPeriodically(); + void persistAllConsumerOffset(); + bool topicRouteDataIsChange(TopicRouteData& olddata, TopicRouteData& nowdata); + bool isNeedUpdateTopicRouteInfo(const std::string& topic); + void unregisterClientWithLock(const std::string& producerGroup, const std::string& consumerGroup); + void unregisterClient(const std::string& producerGroup, const std::string& consumerGroup); + + typedef void (MQClientFactory::*pScheduledFunc)(); + + class ScheduledTask : public kpr::TimerTask + { + public: + ScheduledTask(MQClientFactory* pMQClientFactory, pScheduledFunc pScheduled) + : m_pMQClientFactory(pMQClientFactory), m_pScheduled(pScheduled) + { + } + + virtual void DoTask() + { + (m_pMQClientFactory->*m_pScheduled)(); + } + + private: + MQClientFactory* m_pMQClientFactory; + pScheduledFunc m_pScheduled; + }; + typedef kpr::RefHandleT<ScheduledTask> ScheduledTaskPtr; + + // schedule task + void fetchNameServerAddr(); + void updateTopicRouteInfoFromNameServerTask(); + void cleanBroker(); + void persistAllConsumerOffsetTask(); + void recordSnapshotPeriodicallyTask(); + void logStatsPeriodicallyTask(); + + private: + static long LockTimeoutMillis; + ClientConfig m_clientConfig; + int m_factoryIndex; + std::string m_clientId; + long long m_bootTimestamp; + + // Producer + //group --> MQProducerInner + std::map<std::string, MQProducerInner*> m_producerTable; + kpr::RWMutex m_producerTableLock; + + // Consumer + //group --> MQConsumerInner + std::map<std::string, MQConsumerInner*> m_consumerTable; + kpr::RWMutex m_consumerTableLock; + + // AdminExt + // group --> MQAdminExtInner + std::map<std::string, MQAdminExtInner*> m_adminExtTable; + kpr::RWMutex m_adminExtTableLock; + + RemoteClientConfig* m_pRemoteClientConfig; + + MQClientAPIImpl* m_pMQClientAPIImpl; + MQAdminImpl* m_pMQAdminImpl; + + /// Topic---> TopicRouteData + std::map<std::string, TopicRouteData> m_topicRouteTable; + kpr::RWMutex m_topicRouteTableLock; + + kpr::Mutex m_mutex; + kpr::Mutex m_lockNamesrv; + + kpr::Mutex m_lockHeartbeat; + + //-----brokerName + // ------brokerid addr + // ------brokerid addr + std::map<std::string, std::map<int, std::string> > m_brokerAddrTable; + kpr::RWMutex m_brokerAddrTableLock; + + // ��ʱ�߳� + kpr::TimerTaskManager m_timerTaskManager; + ScheduledTaskPtr m_pFetchNameServerAddrTask; + ScheduledTaskPtr m_pUpdateTopicRouteInfoFromNameServerTask; + ScheduledTaskPtr m_pCleanBrokerTask; + ScheduledTaskPtr m_pPersistAllConsumerOffsetTask; + ScheduledTaskPtr m_pRecordSnapshotPeriodicallyTask; + ScheduledTaskPtr m_pLogStatsPeriodicallyTask; + + int m_scheduledTaskIds[6]; + + ClientRemotingProcessor* m_pClientRemotingProcessor; + PullMessageService* m_pPullMessageService; + RebalanceService* m_pRebalanceService; + DefaultMQProducer* m_pDefaultMQProducer; + ServiceState m_serviceState; + + //SOCKET m_datagramSocket; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientManager.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientManager.cpp b/rocketmq-client4cpp/src/MQClientManager.cpp new file mode 100755 index 0000000..b3041fc --- /dev/null +++ b/rocketmq-client4cpp/src/MQClientManager.cpp @@ -0,0 +1,75 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "MQClientManager.h" +#include "ScopedLock.h" +#include "MQClientFactory.h" +#include "ClientConfig.h" + +namespace rmq +{ + + +MQClientManager* MQClientManager::s_instance = new MQClientManager(); + +MQClientManager::MQClientManager() +{ + +} + +MQClientManager::~MQClientManager() +{ + +} + +MQClientManager* MQClientManager::getInstance() +{ + return s_instance; +} + +MQClientFactory* MQClientManager::getAndCreateMQClientFactory(ClientConfig& clientConfig) +{ + std::string clientId = clientConfig.buildMQClientId(); + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + std::map<std::string, MQClientFactory*>::iterator it = m_factoryTable.find(clientId); + + if (it != m_factoryTable.end()) + { + return it->second; + } + else + { + MQClientFactory* factory = new MQClientFactory(clientConfig, m_factoryIndexGenerator++, clientId); + + m_factoryTable[clientId] = factory; + + return factory; + } +} + +void MQClientManager::removeClientFactory(const std::string& clientId) +{ + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + std::map<std::string, MQClientFactory*>::iterator it = m_factoryTable.find(clientId); + + if (it != m_factoryTable.end()) + { + //delete it->second; + m_factoryTable.erase(it); + } +} + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientManager.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientManager.h b/rocketmq-client4cpp/src/MQClientManager.h new file mode 100755 index 0000000..742f8bb --- /dev/null +++ b/rocketmq-client4cpp/src/MQClientManager.h @@ -0,0 +1,49 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MQCLIENTMANAGER_H__ +#define __MQCLIENTMANAGER_H__ + +#include <string> +#include <map> +#include "Mutex.h" +#include "AtomicValue.h" + +namespace rmq +{ + class MQClientFactory; + class ClientConfig; + + class MQClientManager + { + public: + ~MQClientManager(); + static MQClientManager* getInstance(); + MQClientFactory* getAndCreateMQClientFactory(ClientConfig& clientConfig); + void removeClientFactory(const std::string& clientId); + + private: + MQClientManager(); + + private: + static MQClientManager* s_instance; + kpr::AtomicInteger m_factoryIndexGenerator; + std::map<std::string, MQClientFactory*> m_factoryTable; + kpr::Mutex m_mutex; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/Makefile ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/Makefile b/rocketmq-client4cpp/src/Makefile new file mode 100644 index 0000000..5d5f43b --- /dev/null +++ b/rocketmq-client4cpp/src/Makefile @@ -0,0 +1,26 @@ +#----------------------------------------------------------------------- +TARGET := librocketmq.a +CONFIG := +STRIP_FLAG := N +MFLAGS := ${BIT} +CFLAGS += -g -fPIC -Wno-deprecated -fno-strict-aliasing -fno-omit-frame-pointer +CFLAGS_32 += -march=i686 +INCLUDE += -I../include -I./common -I./kpr -I./protocol -I./message -I./transport -I./producer -I./consumer \ + -I./jsoncpp +LIB += -lz -lrt -lpthread +LIB32 += +LIB64 += +#----------------------------------------------------------------------- + +LOCAL_SRC += $(wildcard jsoncpp/*.cpp) +LOCAL_SRC += $(wildcard kpr/*.cpp) +LOCAL_SRC += $(wildcard common/*.cpp) +LOCAL_SRC += $(wildcard protocol/*.cpp) +LOCAL_SRC += $(wildcard message/*.cpp) +LOCAL_SRC += $(wildcard transport/*.cpp) +LOCAL_SRC += $(wildcard producer/*.cpp) +LOCAL_SRC += $(wildcard consumer/*.cpp) + +include ./Makefile.std +#----------------------------------------------------------------------- + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/Makefile.std ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/Makefile.std b/rocketmq-client4cpp/src/Makefile.std new file mode 100755 index 0000000..b23f260 --- /dev/null +++ b/rocketmq-client4cpp/src/Makefile.std @@ -0,0 +1,127 @@ +#------------------------------------------------------------------------------- +CC = gcc +CXX = g++ +CFLAGS += -g -fPIC -Wno-deprecated -Wall -pipe -fno-ident -fno-strict-aliasing -MMD -D_GNU_SOURCE -D_REENTRANT + +#------------------------------------------------------------------------------- +INCLUDE += -I./ +LIB_32 += -L./ +LIB_64 += -L./ + +LOCAL_SRC += $(sort $(wildcard *.cpp *.cc *.c)) +LOCAL_OBJ += $(patsubst %.cpp,%.o, $(patsubst %.cc,%.o, $(patsubst %.c,%.o, $(LOCAL_SRC)))) +DEP_FILE := $(foreach obj, $(LOCAL_OBJ), $(dir $(obj))$(basename $(notdir $(obj))).d) + +#----------------------------------------------------------------------------- +PLATFORM := $(strip $(shell echo `uname -m`)) +ifneq ($(MFLAGS),64) + ifneq ($(MFLAGS),32) + ifeq ($(PLATFORM),x86_64) + MFLAGS := 64 + else + MFLAGS := 32 + endif + endif +endif + +ifeq ($(MFLAGS),64) + ifneq ($(PLATFORM),x86_64) + MFLAGS := 32 + endif +endif + +ifeq ($(MFLAGS),32) + CFLAGS += -D_SYS_EPOLL_ $(CFLAGS_32) +else + CFLAGS += $(CFLAGS_64) +endif + +#----------------------------------------------------------------------------- +ifneq ($(PLATFORM),x86_64) + MFLAGS := 32 + LIB := $(LIB) $(LIB_32) +else + DEP_FILE_32 := $(foreach obj, $(DEP_FILE),$(patsubst %.d,%.32.d, $(obj))) + DEP_FILE_64 := $(foreach obj, $(DEP_FILE),$(patsubst %.d,%.64.d, $(obj))) + + LOCAL_OBJ_32 := $(foreach obj, $(LOCAL_OBJ),$(patsubst %.o,%.32.o,$(obj))) + LOCAL_OBJ_64 := $(foreach obj, $(LOCAL_OBJ),$(patsubst %.o,%.64.o,$(obj))) + + LOCAL_MOCK_OBJ_32 += $(filter-out %Server.32.o, $(LOCAL_OBJ_32)) UnitTest.32.o + LOCAL_MOCK_OBJ_64 += $(filter-out %Server.64.o, $(LOCAL_OBJ_64)) UnitTest.64.o + + CLEANFILE := $(LOCAL_OBJ_32) $(LOCAL_OBJ_64) + + ifeq ($(MFLAGS),64) + DEP_FILE := $(DEP_FILE_64) + LOCAL_OBJ := $(LOCAL_OBJ_64) + LIB := $(LIB) $(LIB_64) + LOCAL_MOCK_OBJ := $(LOCAL_MOCK_OBJ_64) + else + DEP_FILE := $(DEP_FILE_32) + LOCAL_OBJ := $(LOCAL_OBJ_32) + LIB := $(LIB) $(LIB_32) + LOCAL_MOCK_OBJ := $(LOCAL_MOCK_OBJ_32) + endif +endif + +#------------------------------------------------------------------------------- +all : $(LOCAL_OBJ) $(TARGET) $(TARGETS) + +$(filter %.a,$(TARGET)) : $(LOCAL_OBJ) + ar r $@ $(LOCAL_OBJ) + +$(filter %.so,$(TARGET)) : $(LOCAL_OBJ) + $(CXX) -m$(MFLAGS) $(CFLAGS) -shared -o $@ $^ $(INCLUDE) $(LIB) + +$(filter-out %.so %.a,$(TARGET)) : $(LOCAL_OBJ) + $(CXX) -m$(MFLAGS) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB) + +$(filter-out %.so %.a %.y,$(TARGETS)) : % : %.$(MFLAGS).o + $(CXX) -m$(MFLAGS) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB) + +utest : $(LOCAL_MOCK_OBJ) + $(CXX) -m$(MFLAGS) $(CFLAGS) -o UnitTest $^ $(INCLUDE) $(LIB) + +#---------------------------------------------------------------------------------- +uclean: + rm -vf $(LOCAL_MOCK_OBJ) $(TARGET) $(TARGETS) ${CLEANFILE} *.d.tmp gmon.out UnitTest + +clean: + rm -vf $(LOCAL_OBJ) $(TARGET) $(TARGETS) ${CLEANFILE} *.d.tmp gmon.out UnitTest + +cleanall: + rm -vf $(LOCAL_OBJ) $(TARGET) $(TARGETS) $(DEP_FILE) ${CLEANFILE} *.o *.d.tmp *.d gmon.out UnitTest + +ifneq ($(DEP_FILE),) +-include $(DEP_FILE) +endif + +#------------------------------------------------------------------------------- +%.32.o: %.cpp + $(CXX) -m32 $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.32.o: %.cc + $(CXX) -m32 $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.32.o: %.c + $(CC) -m32 $(CFLAGS) $(INCLUDE) -o $@ -c $< +#------------------------------------------------------------------------------- +%.64.o: %.cpp + $(CXX) -m64 $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.64.o: %.cc + $(CXX) -m64 $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.64.o: %.c + $(CC) -m64 $(CFLAGS) $(INCLUDE) -o $@ -c $< +#---------------------------------------------------------------------------------- +%.o: %.cpp + $(CXX) -m$(MFLAGS) $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.o: %.cc + $(CXX) -m$(MFLAGS) $(CFLAGS) $(INCLUDE) -o $@ -c $< + +%.o: %.c + $(CC) -m$(MFLAGS) $(CFLAGS) $(INCLUDE) -o $@ -c $< +#---------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/RocketMQClient.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/RocketMQClient.cpp b/rocketmq-client4cpp/src/RocketMQClient.cpp new file mode 100755 index 0000000..ec377b3 --- /dev/null +++ b/rocketmq-client4cpp/src/RocketMQClient.cpp @@ -0,0 +1,186 @@ +#include "RocketMQClient.h" +#include "AtomicValue.h" +#include "FileUtil.h" + +volatile int RocketMQUtil::_logFd = -1; +int RocketMQUtil::_logLevel = 0; +std::string RocketMQUtil::_logPath = ""; + +pid_t RocketMQUtil::getPid() +{ + static __thread pid_t pid = 0; + if (!pid) + { + pid = getpid(); + } + return pid; +} + +pid_t RocketMQUtil::getTid() +{ + static __thread pid_t pid = 0; + static __thread pid_t tid = 0; + if (!pid || !tid || pid != getpid()) + { + pid = getpid(); + tid = syscall(__NR_gettid); + } + return tid; +} + +int RocketMQUtil::getDiffDays(time_t tmFirst, time_t tmSecond) +{ + static struct timeb g_tb; + static bool g_tbInit = false; + + if(!g_tbInit) + { + ftime(&g_tb); + g_tbInit = true; + } + + return (tmSecond - g_tb.timezone*60)/86400 - (tmFirst - g_tb.timezone*60)/86400; +}; + + +std::string RocketMQUtil::tm2str(const time_t& t, const std::string& sFormat) +{ + struct tm stTm; + localtime_r(&t, &stTm); + + char sTimeString[255] = "\0"; + strftime(sTimeString, sizeof(sTimeString), sFormat.c_str(), &stTm); + + return std::string(sTimeString); +} + +std::string RocketMQUtil::now2str(const std::string& sFormat) +{ + time_t t = time(NULL); + return tm2str(t, sFormat.c_str()); +} + +std::string RocketMQUtil::now2str() +{ + return now2str("%Y-%m-%d %H:%M:%S"); +} + +int64_t RocketMQUtil::getNowMs() +{ + struct timeval tv; + gettimeofday(&tv, NULL); + return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000; +} + + +std::string RocketMQUtil::str2fmt(const char* format, ...) +{ + int dataLen = 0; + va_list args; + char buffer[8092]; + buffer[0] = buffer[sizeof(buffer) - 1] = '\0'; + + va_start( args, format ); + dataLen = ::vsnprintf(buffer, sizeof(buffer), format, args); + va_end(args); + + return std::string(buffer); +}; + + +int RocketMQUtil::initLog(const std::string& sLogPath) +{ + if (sLogPath.empty()) + { + return 0; + } + + const char *pLogLevel = getenv("ROCKETMQ_LOGLEVEL"); + if (pLogLevel != NULL) + { + int logLevel = atoi(pLogLevel); + _logLevel = logLevel; + _logPath = sLogPath; + } + else + { + _logLevel = WARN_LOG; + _logPath = sLogPath; + } + + std::string logDir = kpr::FileUtil::extractFilePath(_logPath); + if (!kpr::FileUtil::isFileExist(logDir, S_IFDIR)) + { + kpr::FileUtil::makeDirRecursive(logDir); + } + + return 0; +} + +void RocketMQUtil::setLogLevel(int logLevel) +{ + _logLevel = logLevel; +} + + +void RocketMQUtil::writeLog(const char* fmt, ...) +{ + if (_logPath.empty()) + { + return; + } + + static volatile time_t last_time = 0; + static std::string last_time_str = ""; + time_t old = last_time; + time_t now = time(NULL); + + if (now - last_time >= 5) + { + if (__sync_bool_compare_and_swap(&last_time, old, now)) + { + std::string time_str = tm2str(now, "%Y%m%d"); + if (_logFd < 0 || time_str != last_time_str) + { + int oldFd = _logFd; + std::string logFullPath = _logPath + "." + time_str; + _logFd = open(logFullPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0666); + if (_logFd > 0) + { + last_time_str = time_str; + } + + if (oldFd > 0) + { + close(oldFd); + } + } + } + } + + char buf[1024*128]; + buf[0] = buf[sizeof(buf) - 1] = '\0'; + + va_list ap; + va_start(ap, fmt); + int size = vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + int logFd = _logFd; + if (logFd > 0 && (size > 0 && size < (int)sizeof(buf))) + { + int ret = write(logFd, buf, size); + if (ret < 0) + { + if (errno == EBADF) + { + write(_logFd, buf, size); + } + } + } + + return; +} + + + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ConsumeStats.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/ConsumeStats.h b/rocketmq-client4cpp/src/common/ConsumeStats.h new file mode 100755 index 0000000..34ea817 --- /dev/null +++ b/rocketmq-client4cpp/src/common/ConsumeStats.h @@ -0,0 +1,95 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __CONSUMESTATS_H__ +#define __CONSUMESTATS_H__ + +#include <map> +#include "MessageQueue.h" + +namespace rmq +{ + typedef struct + { + long long brokerOffset; + long long consumerOffset; + long long lastTimestamp; + } OffsetWrapper; + + /** + * Consumer progress + * + * @author kangliqiang<[email protected]> + */ + class ConsumeStats + { + public: + ConsumeStats() + : m_consumeTps(0) + { + } + + ~ConsumeStats() + { + } + + long long computeTotalDiff() + { + + long long diffTotal = 0L; + + //Iterator<Entry<MessageQueue, OffsetWrapper>> it = m_offsetTable.entrySet().iterator(); + //while (it.hasNext()) { + // Entry<MessageQueue, OffsetWrapper> next = it.next(); + // long long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset(); + // diffTotal += diff; + //} + + return diffTotal; + } + + + std::map<MessageQueue*, OffsetWrapper> getOffsetTable() + { + return m_offsetTable; + } + + + void setOffsetTable(const std::map<MessageQueue*, OffsetWrapper> offsetTable) + { + m_offsetTable = offsetTable; + } + + + long long getConsumeTps() + { + return m_consumeTps; + } + + + void setConsumeTps(long long consumeTps) + { + m_consumeTps = consumeTps; + } + + private: + + std::map<MessageQueue*, OffsetWrapper> m_offsetTable; + long long m_consumeTps; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/FilterAPI.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/FilterAPI.h b/rocketmq-client4cpp/src/common/FilterAPI.h new file mode 100755 index 0000000..3d87306 --- /dev/null +++ b/rocketmq-client4cpp/src/common/FilterAPI.h @@ -0,0 +1,72 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __FILTERAPI_H__ +#define __FILTERAPI_H__ + +#include <string> +#include "SubscriptionData.h" +#include "UtilAll.h" +#include "MQClientException.h" + +namespace rmq +{ + class FilterAPI + { + public: + static SubscriptionData* buildSubscriptionData(const std::string topic, const std::string& subString) + { + SubscriptionData* subscriptionData = new SubscriptionData(); + subscriptionData->setTopic(topic); + subscriptionData->setSubString(subString); + + if (subString.empty() || subString == SubscriptionData::SUB_ALL) + { + subscriptionData->setSubString(SubscriptionData::SUB_ALL); + } + else + { + std::vector<std::string> out; + + UtilAll::Split(out, subString, "||"); + + if (out.empty()) + { + THROW_MQEXCEPTION(MQClientException, "FilterAPI subString split error", -1); + } + + for (size_t i = 0; i < out.size(); i++) + { + std::string tag = out[i]; + if (!tag.empty()) + { + std::string trimString = UtilAll::Trim(tag); + + if (!trimString.empty()) + { + subscriptionData->getTagsSet().insert(trimString); + subscriptionData->getCodeSet().insert(UtilAll::hashCode(trimString)); + } + } + } + } + + return subscriptionData; + } + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MQVersion.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/MQVersion.cpp b/rocketmq-client4cpp/src/common/MQVersion.cpp new file mode 100755 index 0000000..1a03fa6 --- /dev/null +++ b/rocketmq-client4cpp/src/common/MQVersion.cpp @@ -0,0 +1,88 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "MQVersion.h" + +namespace rmq +{ + +int MQVersion::s_CurrentVersion = MQVersion::V3_2_6; + +const char* MQVersion::getVersionDesc(int value) +{ + switch (value) + { + case V3_0_0_BETA9_SNAPSHOT: + return "V3_0_0_BETA9_SNAPSHOT"; + case V3_0_0_BETA9: + return "V3_0_0_BETA9"; + case V3_0_0_FINAL: + return "V3_0_0_FINAL"; + case V3_0_1_SNAPSHOT: + return "V3_0_1_SNAPSHOT"; + case V3_0_1: + return "V3_0_1"; + case V3_0_2_SNAPSHOT: + return "V3_0_2_SNAPSHOT"; + case V3_0_2: + return "V3_0_2"; + case V3_0_3: + return "V3_0_3"; + case V3_0_4_SNAPSHOT: + return "V3_0_4_SNAPSHOT"; + case V3_0_4: + return "V3_0_4"; + case V3_0_5_SNAPSHOT: + return "V3_0_5_SNAPSHOT"; + case V3_0_5: + return "V3_0_5"; + case V3_0_6_SNAPSHOT: + return "V3_0_6_SNAPSHOT"; + case V3_0_6: + return "V3_0_6"; + case V3_0_7_SNAPSHOT: + return "V3_0_7_SNAPSHOT"; + case V3_0_7: + return "V3_0_7"; + + case V3_2_6_SNAPSHOT: + return "V3_2_6_SNAPSHOT"; + case V3_2_6: + return "V3_2_6"; + case V3_2_7_SNAPSHOT: + return "V3_2_7_SNAPSHOT"; + case V3_2_7: + return "V3_2_7"; + case V3_2_8_SNAPSHOT: + return "V3_2_8_SNAPSHOT"; + case V3_2_8: + return "V3_2_8"; + case V3_5_8_SNAPSHOT: + return "V3_5_8_SNAPSHOT"; + case V3_5_8: + return "V3_5_8"; + } + + return ""; +} + + +MQVersion::Version MQVersion::value2Version(int value) +{ + return (MQVersion::Version)value; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MQVersion.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/MQVersion.h b/rocketmq-client4cpp/src/common/MQVersion.h new file mode 100755 index 0000000..d92957c --- /dev/null +++ b/rocketmq-client4cpp/src/common/MQVersion.h @@ -0,0 +1,184 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MQVERSION_H__ +#define __MQVERSION_H__ + +#include <string> + +namespace rmq +{ + /** + * Version Info + * + */ + class MQVersion + { + public: + enum Version + { + V3_0_0_SNAPSHOT, + V3_0_0_ALPHA1, + V3_0_0_BETA1, + V3_0_0_BETA2, + V3_0_0_BETA3, + V3_0_0_BETA4, + V3_0_0_BETA5, + V3_0_0_BETA6_SNAPSHOT, + V3_0_0_BETA6, + V3_0_0_BETA7_SNAPSHOT, + V3_0_0_BETA7, + V3_0_0_BETA8_SNAPSHOT, + V3_0_0_BETA8, + V3_0_0_BETA9_SNAPSHOT, + V3_0_0_BETA9, + V3_0_0_FINAL, + V3_0_1_SNAPSHOT, + V3_0_1, + V3_0_2_SNAPSHOT, + V3_0_2, + + V3_0_3_SNAPSHOT, + V3_0_3, + V3_0_4_SNAPSHOT, + V3_0_4, + V3_0_5_SNAPSHOT, + V3_0_5, + V3_0_6_SNAPSHOT, + V3_0_6, + V3_0_7_SNAPSHOT, + V3_0_7, + V3_0_8_SNAPSHOT, + V3_0_8, + V3_0_9_SNAPSHOT, + V3_0_9, + V3_0_10_SNAPSHOT, + V3_0_10, + V3_0_11_SNAPSHOT, + V3_0_11, + V3_0_12_SNAPSHOT, + V3_0_12, + V3_0_13_SNAPSHOT, + V3_0_13, + V3_0_14_SNAPSHOT, + V3_0_14, + V3_0_15_SNAPSHOT, + V3_0_15, + V3_1_0_SNAPSHOT, + V3_1_0, + V3_1_1_SNAPSHOT, + V3_1_1, + V3_1_2_SNAPSHOT, + V3_1_2, + V3_1_3_SNAPSHOT, + V3_1_3, + V3_1_4_SNAPSHOT, + V3_1_4, + V3_1_5_SNAPSHOT, + V3_1_5, + V3_1_6_SNAPSHOT, + V3_1_6, + V3_1_7_SNAPSHOT, + V3_1_7, + V3_1_8_SNAPSHOT, + V3_1_8, + V3_1_9_SNAPSHOT, + V3_1_9, + V3_2_0_SNAPSHOT, + V3_2_0, + V3_2_1_SNAPSHOT, + V3_2_1, + V3_2_2_SNAPSHOT, + V3_2_2, + V3_2_3_SNAPSHOT, + V3_2_3, + V3_2_4_SNAPSHOT, + V3_2_4, + V3_2_5_SNAPSHOT, + V3_2_5, + V3_2_6_SNAPSHOT, + V3_2_6, + V3_2_7_SNAPSHOT, + V3_2_7, + V3_2_8_SNAPSHOT, + V3_2_8, + V3_2_9_SNAPSHOT, + V3_2_9, + V3_3_1_SNAPSHOT, + V3_3_1, + V3_3_2_SNAPSHOT, + V3_3_2, + V3_3_3_SNAPSHOT, + V3_3_3, + V3_3_4_SNAPSHOT, + V3_3_4, + V3_3_5_SNAPSHOT, + V3_3_5, + V3_3_6_SNAPSHOT, + V3_3_6, + V3_3_7_SNAPSHOT, + V3_3_7, + V3_3_8_SNAPSHOT, + V3_3_8, + V3_3_9_SNAPSHOT, + V3_3_9, + V3_4_1_SNAPSHOT, + V3_4_1, + V3_4_2_SNAPSHOT, + V3_4_2, + V3_4_3_SNAPSHOT, + V3_4_3, + V3_4_4_SNAPSHOT, + V3_4_4, + V3_4_5_SNAPSHOT, + V3_4_5, + V3_4_6_SNAPSHOT, + V3_4_6, + V3_4_7_SNAPSHOT, + V3_4_7, + V3_4_8_SNAPSHOT, + V3_4_8, + V3_4_9_SNAPSHOT, + V3_4_9, + V3_5_1_SNAPSHOT, + V3_5_1, + V3_5_2_SNAPSHOT, + V3_5_2, + V3_5_3_SNAPSHOT, + V3_5_3, + V3_5_4_SNAPSHOT, + V3_5_4, + V3_5_5_SNAPSHOT, + V3_5_5, + V3_5_6_SNAPSHOT, + V3_5_6, + V3_5_7_SNAPSHOT, + V3_5_7, + V3_5_8_SNAPSHOT, + V3_5_8, + V3_5_9_SNAPSHOT, + V3_5_9, + }; + + static const char* getVersionDesc(int value); + static Version value2Version(int value); + + public: + static int s_CurrentVersion; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MessageSysFlag.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/MessageSysFlag.cpp b/rocketmq-client4cpp/src/common/MessageSysFlag.cpp new file mode 100755 index 0000000..329b71b --- /dev/null +++ b/rocketmq-client4cpp/src/common/MessageSysFlag.cpp @@ -0,0 +1,47 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "MessageSysFlag.h" + +namespace rmq +{ + +/** +* SysFlag +*/ +int MessageSysFlag::CompressedFlag = (0x1 << 0); +int MessageSysFlag::MultiTagsFlag = (0x1 << 1); + +/** +* 7 6 5 4 3 2 1 0<br> +* SysFlag for transaction +*/ +int MessageSysFlag::TransactionNotType = (0x0 << 2); +int MessageSysFlag::TransactionPreparedType = (0x1 << 2); +int MessageSysFlag::TransactionCommitType = (0x2 << 2); +int MessageSysFlag::TransactionRollbackType = (0x3 << 2); + +int MessageSysFlag::getTransactionValue(int flag) +{ + return flag & TransactionRollbackType; +} + +int MessageSysFlag::resetTransactionValue(int flag, int type) +{ + return (flag & (~TransactionRollbackType)) | type; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MessageSysFlag.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/MessageSysFlag.h b/rocketmq-client4cpp/src/common/MessageSysFlag.h new file mode 100755 index 0000000..3950564 --- /dev/null +++ b/rocketmq-client4cpp/src/common/MessageSysFlag.h @@ -0,0 +1,46 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MESSAGESYSFLAG_H__ +#define __MESSAGESYSFLAG_H__ + +namespace rmq +{ + class MessageSysFlag + { + public: + static int getTransactionValue(int flag); + static int resetTransactionValue(int flag, int type); + + public: + /** + * SysFlag + */ + static int CompressedFlag; + static int MultiTagsFlag; + + /** + * 7 6 5 4 3 2 1 0<br> + * SysFlag for transaction + */ + static int TransactionNotType; + static int TransactionPreparedType; + static int TransactionCommitType; + static int TransactionRollbackType; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MixAll.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/MixAll.cpp b/rocketmq-client4cpp/src/common/MixAll.cpp new file mode 100755 index 0000000..417a5e5 --- /dev/null +++ b/rocketmq-client4cpp/src/common/MixAll.cpp @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MixAll.h" +#include "FileUtil.h" + +namespace rmq +{ + +const std::string MixAll::DEFAULT_TOPIC = "TBW102"; +const std::string MixAll::BENCHMARK_TOPIC = "BenchmarkTest"; +const std::string MixAll::DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; +const std::string MixAll::DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; +const std::string MixAll::TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER"; +const std::string MixAll::CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER"; +const std::string MixAll::SELF_TEST_TOPIC = "SELF_TEST_TOPIC"; +const std::string MixAll::RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; +const std::string MixAll::DLQ_GROUP_TOPIC_PREFIX = "%DLQ%"; +const std::string MixAll::NAMESRV_ADDR_ENV = "NAMESRV_ADDR"; +const std::string MixAll::ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"; +const std::string MixAll::ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir"; +const std::string MixAll::MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel"; +const std::string MixAll::ROCKETMQ_NAMESRV_DOMAIN = "172.30.30.125"; + +std::string MixAll::getRetryTopic(const std::string& consumerGroup) +{ + return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; +} + +bool MixAll::compareAndIncreaseOnly(kpr::AtomicLong& target, long long value) +{ + long long current = target.get(); + while (value > current) + { + long long tmp = target.getAndSet(current, value); + + if (tmp == current) + { + return true; + } + + current = target.get(); + } + + return false; +} + + +std::string MixAll::file2String(const std::string& fileName) +{ + return kpr::FileUtil::load2str(fileName); +} + +void MixAll::string2File(const std::string& fileName, const std::string& fileData) +{ + // write tmp file + std::string tmpFile = fileName + ".tmp"; + kpr::FileUtil::save2file(tmpFile, fileData); + + // backup old file + std::string bakFile = fileName + ".bak"; + std::string oldFileData = kpr::FileUtil::load2str(fileName); + if (!oldFileData.empty()) + { + kpr::FileUtil::save2file(bakFile, oldFileData); + } + + // delete old file + std::remove(fileName.c_str()); + + // rename file + std::rename(tmpFile.c_str(), fileName.c_str()); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MixAll.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/MixAll.h b/rocketmq-client4cpp/src/common/MixAll.h new file mode 100755 index 0000000..797a0ae --- /dev/null +++ b/rocketmq-client4cpp/src/common/MixAll.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MIXALL_H__ +#define __MIXALL_H__ + +#include <string> +#include <vector> +#include <iostream> +#include <fstream> +#include <unistd.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <dirent.h> +#include <fnmatch.h> +#include <arpa/inet.h> +#include <ifaddrs.h> + +#include "AtomicValue.h" + +namespace rmq +{ + class MixAll + { + public: + static const long MASTER_ID = 0L; + static const std::string DEFAULT_TOPIC; + static const std::string BENCHMARK_TOPIC; + static const std::string DEFAULT_PRODUCER_GROUP; + static const std::string DEFAULT_CONSUMER_GROUP; + static const std::string TOOLS_CONSUMER_GROUP; + static const std::string CLIENT_INNER_PRODUCER_GROUP; + static const std::string SELF_TEST_TOPIC; + static const std::string RETRY_GROUP_TOPIC_PREFIX; + static const std::string DLQ_GROUP_TOPIC_PREFIX; + static const std::string NAMESRV_ADDR_ENV; + static const std::string ROCKETMQ_HOME_ENV; + static const std::string ROCKETMQ_HOME_PROPERTY; + static const std::string MESSAGE_COMPRESS_LEVEL; + static const std::string ROCKETMQ_NAMESRV_DOMAIN; + + static std::string getRetryTopic(const std::string& consumerGroup); + static bool compareAndIncreaseOnly(kpr::AtomicLong& target, long long value); + static std::string file2String(const std::string& fileName); + static void string2File(const std::string& fileName, const std::string& fileData); + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/NamesrvConfig.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/NamesrvConfig.h b/rocketmq-client4cpp/src/common/NamesrvConfig.h new file mode 100755 index 0000000..608a4d9 --- /dev/null +++ b/rocketmq-client4cpp/src/common/NamesrvConfig.h @@ -0,0 +1,72 @@ +/** +* Copyright (C) 2013 kangliqiang, [email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __NAMESRVCONFIG_H__ +#define __NAMESRVCONFIG_H__ + +#include <stdlib.h> +#include <string> + +namespace rmq +{ +/** + * Name server Config + * + */ +class NamesrvConfig +{ +public: + NamesrvConfig() + { + m_kvConfigPath = ""; + + char* home = getenv(MixAll::ROCKETMQ_HOME_ENV.c_str()); + if (home) + { + m_rocketmqHome = home; + } + else + { + m_rocketmqHome = ""; + } + } + + const std::string& getRocketmqHome() + { + return m_rocketmqHome; + } + + void setRocketmqHome(const std::string& rocketmqHome) + { + m_rocketmqHome = rocketmqHome; + } + + const std::string& getKvConfigPath() + { + return m_kvConfigPath; + } + + void setKvConfigPath(const std::string& kvConfigPath) + { + m_kvConfigPath = kvConfigPath; + } + +private: + std::string m_rocketmqHome; + std::string m_kvConfigPath; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/NamesrvUtil.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/common/NamesrvUtil.h b/rocketmq-client4cpp/src/common/NamesrvUtil.h new file mode 100755 index 0000000..4f3639c --- /dev/null +++ b/rocketmq-client4cpp/src/common/NamesrvUtil.h @@ -0,0 +1,29 @@ +/** +* Copyright (C) 2013 kangliqiang, [email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __NAMESRVUTIL_H__ +#define __NAMESRVUTIL_H__ + +namespace rmq +{ + namespace NamesrvUtil + { + const char* NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG"; + const char* NAMESPACE_PROJECT_CONFIG = "PROJECT_CONFIG"; + }; +} + +#endif
