http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageQueue.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageQueue.cpp b/rocketmq-client4cpp/src/message/MessageQueue.cpp new file mode 100755 index 0000000..d632550 --- /dev/null +++ b/rocketmq-client4cpp/src/message/MessageQueue.cpp @@ -0,0 +1,153 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "MessageQueue.h" + +#include <string> +#include <sstream> +#include <UtilAll.h> + +namespace rmq +{ + +MessageQueue::MessageQueue() + : m_queueId(0) +{ +} + +MessageQueue::MessageQueue(const std::string& topic, const std::string& brokerName, int queueId) + : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId) +{ + +} + +std::string MessageQueue::getTopic()const +{ + return m_topic; +} + +void MessageQueue::setTopic(const std::string& topic) +{ + m_topic = topic; +} + +std::string MessageQueue::getBrokerName()const +{ + return m_brokerName; +} + +void MessageQueue::setBrokerName(const std::string& brokerName) +{ + m_brokerName = brokerName; +} + +int MessageQueue::getQueueId()const +{ + return m_queueId; +} + +void MessageQueue::setQueueId(int queueId) +{ + m_queueId = queueId; +} + +int MessageQueue::hashCode() +{ + /* + final int prime = 31; + int result = 1; + result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode()); + result = prime * result + queueId; + result = prime * result + ((topic == null) ? 0 : topic.hashCode()); + return result; + */ + std::stringstream ss; + ss << m_topic << m_brokerName << m_queueId; + return UtilAll::hashCode(ss.str()); +} + +std::string MessageQueue::toString() const +{ + std::stringstream ss; + ss << "{topic=" << m_topic + << ",brokerName=" << m_brokerName + << ",queueId=" << m_queueId << "}"; + return ss.str(); +} + + +std::string MessageQueue::toJsonString() const +{ + std::stringstream ss; + ss << "{\"topic\":\"" << m_topic + << "\",\"brokerName\":\"" << m_brokerName + << "\",\"queueId\":" << m_queueId << "}"; + return ss.str(); +} + + +bool MessageQueue::operator==(const MessageQueue& mq)const +{ + if (this == &mq) + { + return true; + } + + if (m_brokerName != mq.m_brokerName) + { + return false; + } + + if (m_queueId != mq.m_queueId) + { + return false; + } + + if (m_topic != mq.m_topic) + { + return false; + } + + return true; +} + +int MessageQueue::compareTo(const MessageQueue& mq)const +{ + { + int result = strcmp(m_topic.c_str(), mq.m_topic.c_str()); + if (result != 0) + { + return result; + } + } + + { + int result = strcmp(m_brokerName.c_str(), mq.m_brokerName.c_str()); + if (result != 0) + { + return result; + } + } + + return m_queueId - mq.m_queueId; +} + +bool MessageQueue::operator<(const MessageQueue& mq)const +{ + return compareTo(mq) < 0; +} + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp new file mode 100755 index 0000000..dcad654 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp @@ -0,0 +1,277 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License") +{ +} +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "DefaultMQProducer.h" + +#include <assert.h> +#include "MessageExt.h" +#include "QueryResult.h" +#include "DefaultMQProducerImpl.h" +#include "MixAll.h" +#include "MQClientException.h" + +namespace rmq +{ + +DefaultMQProducer::DefaultMQProducer() + : m_producerGroup(MixAll::DEFAULT_PRODUCER_GROUP), + m_createTopicKey(MixAll::DEFAULT_TOPIC), + m_defaultTopicQueueNums(4), + m_sendMsgTimeout(3000), + m_compressMsgBodyOverHowmuch(1024 * 4), + m_retryTimesWhenSendFailed(2), + m_retryAnotherBrokerWhenNotStoreOK(false), + m_maxMessageSize(1024 * 128), + m_compressLevel(5) +{ + m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this); +} + +DefaultMQProducer::DefaultMQProducer(const std::string& producerGroup) + : m_producerGroup(producerGroup), + m_createTopicKey(MixAll::DEFAULT_TOPIC), + m_defaultTopicQueueNums(4), + m_sendMsgTimeout(3000), + m_compressMsgBodyOverHowmuch(1024 * 4), + m_retryTimesWhenSendFailed(2), + m_retryAnotherBrokerWhenNotStoreOK(false), + m_maxMessageSize(1024 * 128), + m_compressLevel(5) +{ + m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this); +} + +DefaultMQProducer::~DefaultMQProducer() +{ + // memleak: maybe core + delete m_pDefaultMQProducerImpl; +} + + +void DefaultMQProducer::start() +{ + m_pDefaultMQProducerImpl->start(); +} + +void DefaultMQProducer::shutdown() +{ + m_pDefaultMQProducerImpl->shutdown(); +} + +std::vector<MessageQueue>* DefaultMQProducer::fetchPublishMessageQueues(const std::string& topic) +{ + return m_pDefaultMQProducerImpl->fetchPublishMessageQueues(topic); +} + +SendResult DefaultMQProducer::send(Message& msg) +{ + return m_pDefaultMQProducerImpl->send(msg); +} + +void DefaultMQProducer::send(Message& msg, SendCallback* pSendCallback) +{ + m_pDefaultMQProducerImpl->send(msg, pSendCallback); +} + +void DefaultMQProducer::sendOneway(Message& msg) +{ + m_pDefaultMQProducerImpl->sendOneway(msg); +} + +SendResult DefaultMQProducer::send(Message& msg, MessageQueue& mq) +{ + return m_pDefaultMQProducerImpl->send(msg, mq); +} + +void DefaultMQProducer::send(Message& msg, MessageQueue& mq, SendCallback* pSendCallback) +{ + m_pDefaultMQProducerImpl->send(msg, mq, pSendCallback); +} + +void DefaultMQProducer::sendOneway(Message& msg, MessageQueue& mq) +{ + m_pDefaultMQProducerImpl->sendOneway(msg, mq); +} + +SendResult DefaultMQProducer::send(Message& msg, MessageQueueSelector* pSelector, void* arg) +{ + return m_pDefaultMQProducerImpl->send(msg, pSelector, arg); +} + +void DefaultMQProducer::send(Message& msg, + MessageQueueSelector* pSelector, + void* arg, + SendCallback* pSendCallback) +{ + m_pDefaultMQProducerImpl->send(msg, pSelector, arg, pSendCallback); +} + +void DefaultMQProducer::sendOneway(Message& msg, MessageQueueSelector* pSelector, void* arg) +{ + m_pDefaultMQProducerImpl->sendOneway(msg, pSelector, arg); +} + +TransactionSendResult DefaultMQProducer::sendMessageInTransaction(Message& msg, + LocalTransactionExecuter* tranExecuter, void* arg) +{ + THROW_MQEXCEPTION(MQClientException, + "sendMessageInTransaction not implement, please use TransactionMQProducer class", -1); + TransactionSendResult result; + + return result; +} + +void DefaultMQProducer::createTopic(const std::string& key, const std::string& newTopic, int queueNum) +{ + m_pDefaultMQProducerImpl->createTopic(key, newTopic, queueNum); +} + +long long DefaultMQProducer::searchOffset(const MessageQueue& mq, long long timestamp) +{ + return m_pDefaultMQProducerImpl->searchOffset(mq, timestamp); +} + +long long DefaultMQProducer::maxOffset(const MessageQueue& mq) +{ + return m_pDefaultMQProducerImpl->maxOffset(mq); +} + +long long DefaultMQProducer::minOffset(const MessageQueue& mq) +{ + return m_pDefaultMQProducerImpl->minOffset(mq); +} + +long long DefaultMQProducer::earliestMsgStoreTime(const MessageQueue& mq) +{ + return m_pDefaultMQProducerImpl->earliestMsgStoreTime(mq); +} + +MessageExt* DefaultMQProducer::viewMessage(const std::string& msgId) +{ + return m_pDefaultMQProducerImpl->viewMessage(msgId); +} + +QueryResult DefaultMQProducer::queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end) +{ + + return m_pDefaultMQProducerImpl->queryMessage(topic, key, maxNum, begin, end); +} + +std::string DefaultMQProducer::getProducerGroup() +{ + return m_producerGroup; +} + +void DefaultMQProducer::setProducerGroup(const std::string& producerGroup) +{ + m_producerGroup = producerGroup; +} + +std::string DefaultMQProducer::getCreateTopicKey() +{ + return m_createTopicKey; +} + +void DefaultMQProducer::setCreateTopicKey(const std::string& createTopicKey) +{ + m_createTopicKey = createTopicKey; +} + +int DefaultMQProducer::getSendMsgTimeout() +{ + return m_sendMsgTimeout; +} + +void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout) +{ + m_sendMsgTimeout = sendMsgTimeout; +} + +int DefaultMQProducer::getCompressMsgBodyOverHowmuch() +{ + return m_compressMsgBodyOverHowmuch; +} + +void DefaultMQProducer::setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) +{ + m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; +} + +DefaultMQProducerImpl* DefaultMQProducer::getDefaultMQProducerImpl() +{ + return m_pDefaultMQProducerImpl; +} + +bool DefaultMQProducer::isRetryAnotherBrokerWhenNotStoreOK() +{ + return m_retryAnotherBrokerWhenNotStoreOK; +} + +void DefaultMQProducer::setRetryAnotherBrokerWhenNotStoreOK(bool retryAnotherBrokerWhenNotStoreOK) +{ + m_retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK; +} + +int DefaultMQProducer::getMaxMessageSize() +{ + return m_maxMessageSize; +} + +void DefaultMQProducer::setMaxMessageSize(int maxMessageSize) +{ + m_maxMessageSize = maxMessageSize; +} + +int DefaultMQProducer::getDefaultTopicQueueNums() +{ + return m_defaultTopicQueueNums; +} + +void DefaultMQProducer::setDefaultTopicQueueNums(int defaultTopicQueueNums) +{ + m_defaultTopicQueueNums = defaultTopicQueueNums; +} + +int DefaultMQProducer::getRetryTimesWhenSendFailed() +{ + return m_retryTimesWhenSendFailed; +} + +void DefaultMQProducer::setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) +{ + m_retryTimesWhenSendFailed = retryTimesWhenSendFailed; +} + +int DefaultMQProducer::getCompressLevel() +{ + return m_compressLevel; +} + +void DefaultMQProducer::setCompressLevel(int compressLevel) +{ + assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1); + + m_compressLevel = compressLevel; +} + + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp new file mode 100755 index 0000000..26b3f0b --- /dev/null +++ b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp @@ -0,0 +1,932 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License") +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "DefaultMQProducerImpl.h" +#include "DefaultMQProducer.h" +#include "MessageExt.h" +#include "QueryResult.h" +#include "TopicPublishInfo.h" +#include "MQClientException.h" +#include "LocalTransactionExecuter.h" +#include "SendMessageHook.h" +#include "MQClientManager.h" +#include "MQClientFactory.h" +#include "Validators.h" +#include "MQAdminImpl.h" +#include "MQClientAPIImpl.h" +#include "MessageSysFlag.h" +#include "CommandCustomHeader.h" +#include "KPRUtil.h" +#include "MessageDecoder.h" +#include "MessageQueueSelector.h" +#include "MQProtos.h" +#include "RemotingCommand.h" +#include "UtilAll.h" + + + +namespace rmq +{ + +DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducer + *pDefaultMQProducer) + : m_pDefaultMQProducer(pDefaultMQProducer), + m_serviceState(CREATE_JUST), + m_pMQClientFactory(NULL) +{ +} + +DefaultMQProducerImpl::~DefaultMQProducerImpl() +{ + //delete m_pMQClientFactory; +} + + +void DefaultMQProducerImpl::start() +{ + start(true); +} + +void DefaultMQProducerImpl::start(bool startFactory) +{ + RMQ_DEBUG("DefaultMQProducerImpl::start()"); + + switch (m_serviceState) + { + case CREATE_JUST: + { + RMQ_INFO("the producer [{%s}] start beginning.", + m_pDefaultMQProducer->getProducerGroup().c_str()); + + m_serviceState = START_FAILED; + checkConfig(); + + if (m_pDefaultMQProducer->getProducerGroup() != + MixAll::CLIENT_INNER_PRODUCER_GROUP) + { + m_pDefaultMQProducer->changeInstanceNameToPID(); + } + + m_pMQClientFactory = + MQClientManager::getInstance()->getAndCreateMQClientFactory( + *m_pDefaultMQProducer); + bool registerOK = m_pMQClientFactory->registerProducer( + m_pDefaultMQProducer->getProducerGroup(), this); + + if (!registerOK) + { + m_serviceState = CREATE_JUST; + THROW_MQEXCEPTION(MQClientException, + "The producer group[" + m_pDefaultMQProducer->getProducerGroup() + + "] has been created before, specify another name please.", -1); + } + + m_topicPublishInfoTable[m_pDefaultMQProducer->getCreateTopicKey()] = + TopicPublishInfo(); + + if (startFactory) + { + m_pMQClientFactory->start(); + } + + RMQ_INFO("the producer [%s] start OK", m_pDefaultMQProducer->getProducerGroup().c_str()); + m_serviceState = RUNNING; + } + break; + + case RUNNING: + RMQ_ERROR("This client is already running."); + + case START_FAILED: + RMQ_ERROR("This client failed to start previously."); + + case SHUTDOWN_ALREADY: + RMQ_ERROR("This client has been shutted down."); + THROW_MQEXCEPTION(MQClientException, + "The producer service state not OK, maybe started once, ", -1); + + default: + break; + } + + m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock(); +} + +void DefaultMQProducerImpl::shutdown() +{ + shutdown(true); +} + +void DefaultMQProducerImpl::shutdown(bool shutdownFactory) +{ + RMQ_DEBUG("DefaultMQProducerImpl::shutdown()"); + + switch (m_serviceState) + { + case CREATE_JUST: + break; + + case RUNNING: + m_pMQClientFactory->unregisterProducer( + m_pDefaultMQProducer->getProducerGroup()); + + if (shutdownFactory) + { + m_pMQClientFactory->shutdown(); + } + + RMQ_INFO("the producer [%s] shutdown OK", m_pDefaultMQProducer->getProducerGroup().c_str()); + m_serviceState = SHUTDOWN_ALREADY; + break; + + case SHUTDOWN_ALREADY: + break; + + default: + break; + } +} + + +void DefaultMQProducerImpl::initTransactionEnv() +{ + //TODO +} + +void DefaultMQProducerImpl::destroyTransactionEnv() +{ + //TODO +} + +bool DefaultMQProducerImpl::hasHook() +{ + return !m_hookList.empty(); +} + +void DefaultMQProducerImpl::registerHook(SendMessageHook* pHook) +{ + m_hookList.push_back(pHook); +} + +void DefaultMQProducerImpl::executeHookBefore(const SendMessageContext& context) +{ + std::list<SendMessageHook*>::iterator it = m_hookList.begin(); + + for (; it != m_hookList.end(); it++) + { + try + { + (*it)->sendMessageBefore(context); + } + catch (...) + { + RMQ_WARN("sendMessageBefore exception"); + } + } +} + +void DefaultMQProducerImpl::executeHookAfter(const SendMessageContext& context) +{ + std::list<SendMessageHook*>::iterator it = m_hookList.begin(); + + for (; it != m_hookList.end(); it++) + { + try + { + (*it)->sendMessageAfter(context); + } + catch (...) + { + RMQ_WARN("sendMessageAfter exception"); + } + } +} + + +std::set<std::string> DefaultMQProducerImpl::getPublishTopicList() +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + std::set<std::string> toplist; + std::map<std::string, TopicPublishInfo>::iterator it = + m_topicPublishInfoTable.begin(); + for (; it != m_topicPublishInfoTable.end(); it++) + { + toplist.insert(it->first); + } + + return toplist; +} + +bool DefaultMQProducerImpl::isPublishTopicNeedUpdate(const std::string& topic) +{ + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + std::map<std::string, TopicPublishInfo>::iterator it = + m_topicPublishInfoTable.find(topic); + if (it != m_topicPublishInfoTable.end()) + { + return !it->second.ok(); + } + + return true; +} + +void DefaultMQProducerImpl::checkTransactionState(const std::string& addr, // + const MessageExt& msg, // + const CheckTransactionStateRequestHeader& checkRequestHeader) +{ + //TODO +} + +void DefaultMQProducerImpl::updateTopicPublishInfo(const std::string& topic, + TopicPublishInfo& info) +{ + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + std::map<std::string, TopicPublishInfo>::iterator it = + m_topicPublishInfoTable.find(topic); + if (it != m_topicPublishInfoTable.end()) + { + info.getSendWhichQueue() = it->second.getSendWhichQueue(); + RMQ_INFO("updateTopicPublishInfo prev is not null, %s", it->second.toString().c_str()); + } + m_topicPublishInfoTable[topic] = info; + } +} + +void DefaultMQProducerImpl::createTopic(const std::string& key, + const std::string& newTopic, int queueNum) +{ + makeSureStateOK(); + Validators::checkTopic(newTopic); + + m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum); +} + +std::vector<MessageQueue>* DefaultMQProducerImpl::fetchPublishMessageQueues( + const std::string& topic) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic); +} + +long long DefaultMQProducerImpl::searchOffset(const MessageQueue& mq, + long long timestamp) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp); +} + +long long DefaultMQProducerImpl::maxOffset(const MessageQueue& mq) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq); +} + +long long DefaultMQProducerImpl::minOffset(const MessageQueue& mq) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq); +} + +long long DefaultMQProducerImpl::earliestMsgStoreTime(const MessageQueue& mq) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq); +} + +MessageExt* DefaultMQProducerImpl::viewMessage(const std::string& msgId) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId); +} + +QueryResult DefaultMQProducerImpl::queryMessage(const std::string& topic, + const std::string& key, int maxNum, long long begin, long long end) +{ + makeSureStateOK(); + return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, + begin, end); +} + + +/** + * DEFAULT ASYNC ------------------------------------------------------- + */ +void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback) +{ + send(msg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout()); +} +void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback, int timeout) +{ + try + { + sendDefaultImpl(msg, ASYNC, pSendCallback, timeout); + } + catch (MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1); + } +} + + +/** + * DEFAULT ONEWAY ------------------------------------------------------- + */ +void DefaultMQProducerImpl::sendOneway(Message& msg) +{ + try + { + sendDefaultImpl(msg, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout()); + } + catch (MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1); + } +} + + +/** + * KERNEL SYNC ------------------------------------------------------- + */ +SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq) +{ + return send(msg, mq, m_pDefaultMQProducer->getSendMsgTimeout()); +} +SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq, int timeout) +{ + makeSureStateOK(); + Validators::checkMessage(msg, m_pDefaultMQProducer); + + if (msg.getTopic() != mq.getTopic()) + { + THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1); + } + + return sendKernelImpl(msg, mq, SYNC, NULL, timeout); +} + + +/** + * KERNEL ASYNC ------------------------------------------------------- + */ +void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq, + SendCallback* pSendCallback) +{ + return send(msg, mq, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout()); +} +void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq, + SendCallback* pSendCallback, int timeout) +{ + makeSureStateOK(); + Validators::checkMessage(msg, m_pDefaultMQProducer); + + if (msg.getTopic() != mq.getTopic()) + { + THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1); + } + + try + { + sendKernelImpl(msg, mq, ASYNC, pSendCallback, timeout); + } + catch (MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1); + } +} + +/** + * KERNEL ONEWAY ------------------------------------------------------- + */ +void DefaultMQProducerImpl::sendOneway(Message& msg, MessageQueue& mq) +{ + makeSureStateOK(); + Validators::checkMessage(msg, m_pDefaultMQProducer); + + if (msg.getTopic() != mq.getTopic()) + { + THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1); + } + + try + { + sendKernelImpl(msg, mq, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout()); + } + catch (MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1); + } +} + + +/** + * SELECT SYNC ------------------------------------------------------- + */ +SendResult DefaultMQProducerImpl::send(Message& msg, + MessageQueueSelector* pSelector, void* arg) +{ + return send(msg, pSelector, arg, m_pDefaultMQProducer->getSendMsgTimeout()); +} +SendResult DefaultMQProducerImpl::send(Message& msg, + MessageQueueSelector* pSelector, void* arg, int timeout) +{ + return sendSelectImpl(msg, pSelector, arg, SYNC, NULL, timeout); +} + + +/** + * SELECT ASYNC ------------------------------------------------------- + */ +void DefaultMQProducerImpl::send(Message& msg, + MessageQueueSelector* pSelector, + void* arg, + SendCallback* pSendCallback) +{ + return send(msg, pSelector, arg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout()); +} +void DefaultMQProducerImpl::send(Message& msg, + MessageQueueSelector* pSelector, + void* arg, + SendCallback* pSendCallback, + int timeout) +{ + try + { + sendSelectImpl(msg, pSelector, arg, ASYNC, pSendCallback, timeout); + } + catch (MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1); + } +} + + +/** + * SELECT ONEWAY ------------------------------------------------------- + */ +void DefaultMQProducerImpl::sendOneway(Message& msg, + MessageQueueSelector* pSelector, void* arg) +{ + try + { + sendSelectImpl(msg, pSelector, arg, ONEWAY, NULL, + m_pDefaultMQProducer->getSendMsgTimeout()); + } + catch (MQBrokerException& e) + { + THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1); + } +} + + +/* + * Send with Transaction + */ +TransactionSendResult DefaultMQProducerImpl::sendMessageInTransaction( + Message& msg, + LocalTransactionExecuter* tranExecuter, void* arg) +{ + //TODO + TransactionSendResult result; + return result; +} + +void DefaultMQProducerImpl::endTransaction(// + SendResult sendResult, // + LocalTransactionState localTransactionState, // + MQClientException localException) +{ + //TODO +} + +/** + * DEFAULT SYNC ------------------------------------------------------- + */ +SendResult DefaultMQProducerImpl::send(Message& msg) +{ + return send(msg, m_pDefaultMQProducer->getSendMsgTimeout()); +} +SendResult DefaultMQProducerImpl::send(Message& msg, int timeout) +{ + return sendDefaultImpl(msg, SYNC, NULL, timeout); +} + + +std::map<std::string, TopicPublishInfo> DefaultMQProducerImpl::getTopicPublishInfoTable() +{ + return m_topicPublishInfoTable; +} + +MQClientFactory* DefaultMQProducerImpl::getMQClientFactory() +{ + return m_pMQClientFactory; +} + +int DefaultMQProducerImpl::getZipCompressLevel() +{ + return m_zipCompressLevel; +} + +void DefaultMQProducerImpl::setZipCompressLevel(int zipCompressLevel) +{ + m_zipCompressLevel = zipCompressLevel; +} + +ServiceState DefaultMQProducerImpl::getServiceState() { + return m_serviceState; +} + + +void DefaultMQProducerImpl::setServiceState(ServiceState serviceState) { + m_serviceState = serviceState; +} + + +SendResult DefaultMQProducerImpl::sendDefaultImpl(Message& msg, + CommunicationMode communicationMode, + SendCallback* pSendCallback, + int timeout) +{ + makeSureStateOK(); + Validators::checkMessage(msg, m_pDefaultMQProducer); + + long long maxTimeout = m_pDefaultMQProducer->getSendMsgTimeout() + 1000; + long long beginTimestamp = KPRUtil::GetCurrentTimeMillis(); + long long endTimestamp = beginTimestamp; + TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic()); + SendResult sendResult; + + if (topicPublishInfo.ok()) + { + MessageQueue* mq = NULL; + + int times = 0; + int timesTotal = 1 + m_pDefaultMQProducer->getRetryTimesWhenSendFailed(); + std::vector<std::string> brokersSent; + for (; times < timesTotal && int(endTimestamp - beginTimestamp) < maxTimeout; times++) + { + std::string lastBrokerName = (NULL == mq) ? "" : mq->getBrokerName(); + MessageQueue* tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); + + if (tmpmq != NULL) + { + mq = tmpmq; + brokersSent.push_back(mq->getBrokerName()); + + try + { + sendResult = sendKernelImpl(msg, *mq, communicationMode, pSendCallback, timeout); + endTimestamp = KPRUtil::GetCurrentTimeMillis(); + + switch (communicationMode) + { + case ASYNC: + return sendResult; + + case ONEWAY: + return sendResult; + + case SYNC: + if (sendResult.getSendStatus() != SEND_OK) + { + if (m_pDefaultMQProducer->isRetryAnotherBrokerWhenNotStoreOK()) + { + continue; + } + } + + return sendResult; + + default: + break; + } + } + catch (RemotingException& e) + { + endTimestamp = KPRUtil::GetCurrentTimeMillis(); + continue; + } + catch (MQClientException& e) + { + endTimestamp = KPRUtil::GetCurrentTimeMillis(); + continue; + } + catch (MQBrokerException& e) + { + endTimestamp = KPRUtil::GetCurrentTimeMillis(); + + switch (e.GetError()) + { + case TOPIC_NOT_EXIST_VALUE: + case SERVICE_NOT_AVAILABLE_VALUE: + case SYSTEM_ERROR_VALUE: + case NO_PERMISSION_VALUE: + case NO_BUYER_ID_VALUE: + case NOT_IN_CURRENT_UNIT_VALUE: + continue; + default: + if (sendResult.hasResult()) + { + return sendResult; + } + throw; + } + } + catch (InterruptedException& e) + { + endTimestamp = KPRUtil::GetCurrentTimeMillis(); + throw; + } + } + else + { + break; + } + } // end of for + + std::string info = RocketMQUtil::str2fmt("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", + times, int(endTimestamp - beginTimestamp), msg.getTopic().c_str(), UtilAll::toString(brokersSent).c_str()); + RMQ_WARN("%s", info.c_str()); + THROW_MQEXCEPTION(MQClientException, info, -1); + return sendResult; + } + + std::vector<std::string> nsList = + getMQClientFactory()->getMQClientAPIImpl()->getNameServerAddressList(); + if (nsList.empty()) + { + THROW_MQEXCEPTION(MQClientException, "No name server address, please set it", -1); + } + + THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1); +} + +SendResult DefaultMQProducerImpl::sendKernelImpl(Message& msg, + const MessageQueue& mq, + CommunicationMode communicationMode, + SendCallback* sendCallback, + int timeout) +{ + std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + if (brokerAddr.empty()) + { + tryToFindTopicPublishInfo(mq.getTopic()); + brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + } + + SendMessageContext context; + if (!brokerAddr.empty()) + { + try + { + int sysFlag = 0; + + if (tryToCompressMessage(msg)) + { + sysFlag |= MessageSysFlag::CompressedFlag; + } + + std::string tranMsg = msg.getProperty(Message::PROPERTY_TRANSACTION_PREPARED); + if (!tranMsg.empty() && tranMsg == "true") + { + sysFlag |= MessageSysFlag::TransactionPreparedType; + } + + // ִ��hook + if (hasHook()) + { + context.producerGroup = (m_pDefaultMQProducer->getProducerGroup()); + context.communicationMode = (communicationMode); + context.brokerAddr = (brokerAddr); + context.msg = (msg); + context.mq = (mq); + executeHookBefore(context); + } + + SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader(); + requestHeader->producerGroup = (m_pDefaultMQProducer->getProducerGroup()); + requestHeader->topic = (msg.getTopic()); + requestHeader->defaultTopic = (m_pDefaultMQProducer->getCreateTopicKey()); + requestHeader->defaultTopicQueueNums = (m_pDefaultMQProducer->getDefaultTopicQueueNums()); + requestHeader->queueId = (mq.getQueueId()); + requestHeader->sysFlag = (sysFlag); + requestHeader->bornTimestamp = (KPRUtil::GetCurrentTimeMillis()); + requestHeader->flag = (msg.getFlag()); + requestHeader->properties = (MessageDecoder::messageProperties2String(msg.getProperties())); + requestHeader->reconsumeTimes = 0; + + if (requestHeader->topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) == 0) + { + std::string reconsumeTimes = msg.getProperty(Message::PROPERTY_RECONSUME_TIME); + if (!reconsumeTimes.empty()) + { + requestHeader->reconsumeTimes = int(UtilAll::str2ll(reconsumeTimes.c_str())); + msg.clearProperty(Message::PROPERTY_RECONSUME_TIME); + } + + /* + 3.5.8 new features + std::string maxReconsumeTimes = msg.getProperty(Message::PROPERTY_MAX_RECONSUME_TIMES); + if (!maxReconsumeTimes.empty()) + { + requestHeader->maxReconsumeTimes = int(UtilAll::str2ll(maxReconsumeTimes.c_str())); + msg.clearProperty(Message::PROPERTY_MAX_RECONSUME_TIMES); + } + */ + } + + SendResult sendResult = m_pMQClientFactory->getMQClientAPIImpl()->sendMessage( + brokerAddr, + mq.getBrokerName(), + msg, + requestHeader, + timeout, + communicationMode, + sendCallback + ); + + if (hasHook()) + { + context.sendResult = (sendResult); + executeHookAfter(context); + } + + return sendResult; + } + catch (RemotingException& e) + { + if (hasHook()) + { + context.pException = (&e); + executeHookAfter(context); + } + RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str()); + throw; + } + catch (MQBrokerException& e) + { + if (hasHook()) + { + context.pException = (&e); + executeHookAfter(context); + } + RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str()); + throw; + } + catch (InterruptedException& e) + { + if (hasHook()) + { + context.pException = (&e); + executeHookAfter(context); + } + RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str()); + throw; + } + } + + THROW_MQEXCEPTION(MQClientException, std::string("The broker[") + mq.getBrokerName() + "] not exist", -1); +} + +SendResult DefaultMQProducerImpl::sendSelectImpl(Message& msg, + MessageQueueSelector* selector, + void* pArg, + CommunicationMode communicationMode, + SendCallback* sendCallback, + int timeout) +{ + makeSureStateOK(); + Validators::checkMessage(msg, m_pDefaultMQProducer); + + SendResult result; + TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic()); + SendResult sendResult; + + if (topicPublishInfo.ok()) + { + MessageQueue* mq = NULL; + + try + { + mq = selector->select(topicPublishInfo.getMessageQueueList(), msg, pArg); + } + catch (std::exception& e) + { + THROW_MQEXCEPTION(MQClientException, + std::string("select message queue throwed exception, ") + e.what(), -1); + } + catch (...) + { + THROW_MQEXCEPTION(MQClientException, "select message queue throwed exception, ", -1); + } + + if (mq != NULL) + { + return sendKernelImpl(msg, *mq, communicationMode, sendCallback, timeout); + } + else + { + THROW_MQEXCEPTION(MQClientException, "select message queue return null", -1); + } + } + + THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1); +} + +void DefaultMQProducerImpl::makeSureStateOK() +{ + if (m_serviceState != RUNNING) + { + THROW_MQEXCEPTION(MQClientException, "The producer service state not OK, ", -1); + } +} + +void DefaultMQProducerImpl::checkConfig() +{ + Validators::checkGroup(m_pDefaultMQProducer->getProducerGroup()); + + if (m_pDefaultMQProducer->getProducerGroup().empty()) + { + THROW_MQEXCEPTION(MQClientException, "producerGroup is null", -1); + } + + if (m_pDefaultMQProducer->getProducerGroup() == MixAll::DEFAULT_PRODUCER_GROUP) + { + THROW_MQEXCEPTION(MQClientException, + std::string("producerGroup can not equal [") + MixAll::DEFAULT_PRODUCER_GROUP + "], please specify another one", + -1); + } +} + +TopicPublishInfo& DefaultMQProducerImpl::tryToFindTopicPublishInfo( + const std::string& topic) +{ + std::map<std::string, TopicPublishInfo>::iterator it; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + it = m_topicPublishInfoTable.find(topic); + } + + if (it == m_topicPublishInfoTable.end() || !it->second.ok()) + { + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + m_topicPublishInfoTable[topic] = TopicPublishInfo(); + } + + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic); + + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + it = m_topicPublishInfoTable.find(topic); + } + } + + if (it != m_topicPublishInfoTable.end() + && (it->second.ok() || it->second.isHaveTopicRouterInfo())) + { + return (it->second); + } + else + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic, true, + m_pDefaultMQProducer); + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock); + it = m_topicPublishInfoTable.find(topic); + } + return (it->second); + } +} + +bool DefaultMQProducerImpl::tryToCompressMessage(Message& msg) +{ + if (msg.getBodyLen() >= m_pDefaultMQProducer->getCompressMsgBodyOverHowmuch()) + { + if (msg.tryToCompress(m_pDefaultMQProducer->getCompressLevel())) + { + return true; + } + } + + return false; +} + +TransactionCheckListener* DefaultMQProducerImpl::checkListener() +{ + return NULL; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h new file mode 100755 index 0000000..3df914c --- /dev/null +++ b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h @@ -0,0 +1,205 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __DEFAULTMQPRODUCERIMPL_H__ +#define __DEFAULTMQPRODUCERIMPL_H__ + +#include <list> +#include <vector> +#include "MQProducerInner.h" +#include "QueryResult.h" +#include "ServiceState.h" +#include "CommunicationMode.h" +#include "SendResult.h" +#include "MQClientException.h" +#include "Mutex.h" +#include "ScopedLock.h" + + +namespace rmq +{ + class DefaultMQProducer; + class SendMessageHook; + class SendMessageContext; + class MessageQueue; + class MessageExt; + class SendCallback; + class MessageQueueSelector; + class MQClientFactory; + class MQClientException; + class RemotingException; + class MQBrokerException; + class InterruptedException; + class LocalTransactionExecuter; + + + class DefaultMQProducerImpl : public MQProducerInner + { + public: + DefaultMQProducerImpl(DefaultMQProducer* pDefaultMQProducer); + ~DefaultMQProducerImpl(); + void initTransactionEnv(); + void destroyTransactionEnv(); + + bool hasHook(); + void registerHook(SendMessageHook* pHook); + void executeHookBefore(const SendMessageContext& context); + void executeHookAfter(const SendMessageContext& context); + + void start(); + void start(bool startFactory); + void shutdown(); + void shutdown(bool shutdownFactory); + + std::set<std::string> getPublishTopicList(); + bool isPublishTopicNeedUpdate(const std::string& topic); + + void checkTransactionState(const std::string& addr, + const MessageExt& msg, + const CheckTransactionStateRequestHeader& checkRequestHeader); + + void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info); + virtual TransactionCheckListener* checkListener(); + + void createTopic(const std::string& key, const std::string& newTopic, int queueNum); + std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic); + + long long searchOffset(const MessageQueue& mq, long long timestamp); + long long maxOffset(const MessageQueue& mq); + long long minOffset(const MessageQueue& mq); + + long long earliestMsgStoreTime(const MessageQueue& mq); + + MessageExt* viewMessage(const std::string& msgId); + QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end); + + /** + * DEFAULT ASYNC ------------------------------------------------------- + */ + void send(Message& msg, SendCallback* sendCallback); + void send(Message& msg, SendCallback* sendCallback, int timeout); + + /** + * DEFAULT ONEWAY ------------------------------------------------------- + */ + void sendOneway(Message& msg); + + /** + * KERNEL SYNC ------------------------------------------------------- + */ + SendResult send(Message& msg, MessageQueue& mq); + SendResult send(Message& msg, MessageQueue& mq, int timeout); + + /** + * KERNEL ASYNC ------------------------------------------------------- + */ + void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback); + void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback, int timeout); + + /** + * KERNEL ONEWAY ------------------------------------------------------- + */ + void sendOneway(Message& msg, MessageQueue& mq); + + /** + * SELECT SYNC ------------------------------------------------------- + */ + SendResult send(Message& msg, MessageQueueSelector* selector, void* arg); + SendResult send(Message& msg, MessageQueueSelector* selector, void* arg, int timeout); + + /** + * SELECT ASYNC ------------------------------------------------------- + */ + void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback); + void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback, int timeout); + + /** + * SELECT ONEWAY ------------------------------------------------------- + */ + void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg); + + /** + * SEND with Transaction + */ + TransactionSendResult sendMessageInTransaction(Message& msg, LocalTransactionExecuter* tranExecuter, void* arg); + + /** + * DEFAULT SYNC ------------------------------------------------------- + */ + SendResult send(Message& msg); + SendResult send(Message& msg, int timeout); + + std::map<std::string, TopicPublishInfo> getTopicPublishInfoTable(); + + MQClientFactory* getMQClientFactory(); + + int getZipCompressLevel(); + void setZipCompressLevel(int zipCompressLevel); + + ServiceState getServiceState(); + void setServiceState(ServiceState serviceState); + + private: + SendResult sendSelectImpl(Message& msg, + MessageQueueSelector* selector, + void* pArg, + CommunicationMode communicationMode, + SendCallback* sendCallback, + int timeout); + + SendResult sendDefaultImpl(Message& msg, + CommunicationMode communicationMode, + SendCallback* pSendCallback, + int timeout); + + SendResult sendKernelImpl(Message& msg, + const MessageQueue& mq, + CommunicationMode communicationMode, + SendCallback* pSendCallback, + int timeout); + + void endTransaction(SendResult sendResult, + LocalTransactionState localTransactionState, + MQClientException localException); + + void makeSureStateOK(); + void checkConfig(); + + TopicPublishInfo& tryToFindTopicPublishInfo(const std::string& topic) ; + bool tryToCompressMessage(Message& msg); + + protected: + //TODO transaction imp + + private: + int m_zipCompressLevel;// message compress level, default is 5 + + DefaultMQProducer* m_pDefaultMQProducer; + + std::map<std::string, TopicPublishInfo> m_topicPublishInfoTable; + kpr::RWMutex m_topicPublishInfoTableLock; + + ServiceState m_serviceState; + MQClientFactory* m_pMQClientFactory; + std::list<SendMessageHook*> m_hookList; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h new file mode 100755 index 0000000..a124884 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __LOCALTRANSACTIONEXECUTER_H__ +#define __LOCALTRANSACTIONEXECUTER_H__ + +#include "SendResult.h" + +namespace rmq +{ + class LocalTransactionExecuter + { + public: + virtual~LocalTransactionExecuter() {} + virtual LocalTransactionState executeLocalTransactionBranch(Message& msg, void* arg) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/MQProducerInner.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/MQProducerInner.h b/rocketmq-client4cpp/src/producer/MQProducerInner.h new file mode 100755 index 0000000..56194dc --- /dev/null +++ b/rocketmq-client4cpp/src/producer/MQProducerInner.h @@ -0,0 +1,44 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MQPRODUCERINNER_H__ +#define __MQPRODUCERINNER_H__ + +#include <string> +#include <set> + +namespace rmq +{ + class TransactionCheckListener; + class MessageExt; + class CheckTransactionStateRequestHeader; + class TopicPublishInfo; + + class MQProducerInner + { + public: + virtual ~MQProducerInner() {} + virtual std::set<std::string> getPublishTopicList() = 0; + virtual bool isPublishTopicNeedUpdate(const std::string& topic) = 0; + virtual TransactionCheckListener* checkListener() = 0; + virtual void checkTransactionState(const std::string& addr, // + const MessageExt& msg, // + const CheckTransactionStateRequestHeader& checkRequestHeader) = 0; + virtual void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/MessageQueueSelector.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/MessageQueueSelector.h b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h new file mode 100755 index 0000000..6d5ac48 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h @@ -0,0 +1,96 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MESSAGEQUEUESELECTOR_H__ +#define __MESSAGEQUEUESELECTOR_H__ + +#include <stdlib.h> +#include <stdio.h> +#include <time.h> +#include <math.h> +#include <set> +#include <string> +#include <vector> + +#include "MessageQueue.h" +#include "UtilAll.h" + +namespace rmq +{ + class Message; + + class MessageQueueSelector + { + public: + virtual ~MessageQueueSelector() {} + virtual MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) = 0; + }; + + class SelectMessageQueueByRandoom : public MessageQueueSelector + { + public: + MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) + { + srand((unsigned)time(NULL)); + int value = rand(); + value = value % mqs.size(); + return &(mqs.at(value)); + } + }; + + class SelectMessageQueueByHash : public MessageQueueSelector + { + public: + MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) + { + std::string* sArg = (std::string*)arg; + int value = UtilAll::hashCode(sArg->c_str(), sArg->size()); + if (value < 0) + { + value = abs(value); + } + + value = value % mqs.size(); + return &(mqs.at(value)); + } + }; + + + class SelectMessageQueueByMachineRoom : public MessageQueueSelector + { + public: + MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) + { + // TODO Auto-generated method stub + return NULL; + } + + std::set<std::string> getConsumeridcs() + { + return m_consumeridcs; + } + + void setConsumeridcs(const std::set<std::string>& consumeridcs) + { + m_consumeridcs = consumeridcs; + } + + private: + std::set<std::string> m_consumeridcs; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp new file mode 100755 index 0000000..573db95 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp @@ -0,0 +1,101 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "ProducerInvokeCallback.h" +#include "ResponseFuture.h" +#include "SendResult.h" +#include "MQClientAPIImpl.h" +#include "SendCallback.h" +#include "MQClientException.h" +#include "RemotingCommand.h" + +namespace rmq +{ + +ProducerInvokeCallback::ProducerInvokeCallback(SendCallback* pSendCallBack, + MQClientAPIImpl* pMQClientAPIImpl, + const std::string& topic, + const std::string& brokerName) + : m_pSendCallBack(pSendCallBack), + m_pMQClientAPIImpl(pMQClientAPIImpl), + m_topic(topic), + m_brokerName(brokerName) +{ +} + +ProducerInvokeCallback::~ProducerInvokeCallback() +{ + if (m_pSendCallBack) + { + delete m_pSendCallBack; + m_pSendCallBack = NULL; + } +} + +void ProducerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture) +{ + if (m_pSendCallBack == NULL) + { + delete this; + return; + } + + RemotingCommand* response = pResponseFuture->getResponseCommand(); + if (response != NULL) + { + try + { + SendResult* sendResult = + m_pMQClientAPIImpl->processSendResponse(m_brokerName, m_topic, response); + + assert(sendResult != NULL); + m_pSendCallBack->onSuccess(*sendResult); + + delete sendResult; + } + catch (MQException& e) + { + m_pSendCallBack->onException(e); + } + + delete response; + } + else + { + if (!pResponseFuture->isSendRequestOK()) + { + std::string msg = "send request failed"; + MQClientException e(msg, -1, __FILE__, __LINE__); + m_pSendCallBack->onException(e); + } + else if (pResponseFuture->isTimeout()) + { + std::string msg = RocketMQUtil::str2fmt("wait response timeout %lld ms", + pResponseFuture->getTimeoutMillis()); + MQClientException e(msg, -1, __FILE__, __LINE__); + m_pSendCallBack->onException(e); + } + else + { + std::string msg = "unknow reseaon"; + MQClientException e(msg, -1, __FILE__, __LINE__); + m_pSendCallBack->onException(e); + } + } + + delete this; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h new file mode 100755 index 0000000..d2c9825 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h @@ -0,0 +1,46 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __PRODUCERINVOKECALLBACK_H__ +#define __PRODUCERINVOKECALLBACK_H__ + +#include <string> +#include "InvokeCallback.h" + +namespace rmq +{ + class MQClientAPIImpl; + class SendCallback; + + class ProducerInvokeCallback : public InvokeCallback + { + public: + ProducerInvokeCallback(SendCallback* pSendCallBack, + MQClientAPIImpl* pMQClientAPIImpl, + const std::string& topic, + const std::string& brokerName); + virtual ~ProducerInvokeCallback(); + virtual void operationComplete(ResponseFuturePtr pResponseFuture); + + private: + SendCallback* m_pSendCallBack; + MQClientAPIImpl* m_pMQClientAPIImpl; + std::string m_topic; + std::string m_brokerName; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TopicPublishInfo.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/TopicPublishInfo.h b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h new file mode 100755 index 0000000..0d85b5f --- /dev/null +++ b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h @@ -0,0 +1,141 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __TOPICPUBLISHINFO_H__ +#define __TOPICPUBLISHINFO_H__ + +#include <list> +#include <vector> +#include <string> +#include <sstream> +#include <math.h> +#include <stdlib.h> + +#include "RocketMQClient.h" +#include "RefHandle.h" +#include "MessageQueue.h" +#include "AtomicValue.h" +#include "UtilAll.h" + + +namespace rmq +{ + class TopicPublishInfo : public kpr::RefCount + { + public: + TopicPublishInfo() + { + m_orderTopic = false; + m_haveTopicRouterInfo = false; + } + + ~TopicPublishInfo() + { + m_messageQueueList.clear(); + } + + bool isOrderTopic() + { + return m_orderTopic; + } + + bool ok() + { + return !m_messageQueueList.empty(); + } + + void setOrderTopic(bool orderTopic) + { + m_orderTopic = orderTopic; + } + + std::vector<MessageQueue>& getMessageQueueList() + { + return m_messageQueueList; + } + + void setMessageQueueList(const std::vector<MessageQueue>& messageQueueList) + { + m_messageQueueList = messageQueueList; + } + + kpr::AtomicInteger& getSendWhichQueue() + { + return m_sendWhichQueue; + } + + void setSendWhichQueue(kpr::AtomicInteger& sendWhichQueue) + { + m_sendWhichQueue = sendWhichQueue; + } + + bool isHaveTopicRouterInfo() + { + return m_haveTopicRouterInfo; + } + + + void setHaveTopicRouterInfo(bool haveTopicRouterInfo) + { + m_haveTopicRouterInfo = haveTopicRouterInfo; + } + + MessageQueue* selectOneMessageQueue(const std::string lastBrokerName) + { + if (!lastBrokerName.empty()) + { + int index = m_sendWhichQueue++; + for (size_t i = 0; i < m_messageQueueList.size(); i++) + { + int pos = abs(index++) % m_messageQueueList.size(); + MessageQueue& mq = m_messageQueueList.at(pos); + if (mq.getBrokerName() != lastBrokerName) + { + return &mq; + } + } + + return NULL; + } + else + { + int index = m_sendWhichQueue++; + int pos = abs(index) % m_messageQueueList.size(); + return &(m_messageQueueList.at(pos)); + } + } + + std::string toString() const + { + std::stringstream ss; + ss << "{orderTopic=" << m_orderTopic + << ",messageQueueList=" << UtilAll::toString(m_messageQueueList) + << ",sendWhichQueue=" << m_sendWhichQueue + << ",haveTopicRouterInfo=" << m_haveTopicRouterInfo + << "}"; + return ss.str(); + } + + private: + bool m_orderTopic; + std::vector<MessageQueue> m_messageQueueList; + kpr::AtomicInteger m_sendWhichQueue; + bool m_haveTopicRouterInfo; + }; + typedef kpr::RefHandleT<TopicPublishInfo> TopicPublishInfoPtr; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TransactionCheckListener.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/TransactionCheckListener.h b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h new file mode 100755 index 0000000..8955742 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TRANSACTIONCHECKLISTENER_H__ +#define __TRANSACTIONCHECKLISTENER_H__ + +#include "SendResult.h" + +namespace rmq +{ + class TransactionCheckListener + { + public: + virtual ~TransactionCheckListener() {} + virtual LocalTransactionState checkLocalTransactionState(MessageExt* pMsg) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TransactionMQProducer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/producer/TransactionMQProducer.h b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h new file mode 100755 index 0000000..bee11a5 --- /dev/null +++ b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h @@ -0,0 +1,118 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __TRANSACTIONMQPRODUCER_H__ +#define __TRANSACTIONMQPRODUCER_H__ + +#include "DefaultMQProducer.h" +#include "DefaultMQProducerImpl.h" +#include "MQClientException.h" + +namespace rmq +{ + class TransactionMQProducer : public DefaultMQProducer + { + public: + TransactionMQProducer() + : m_pTransactionCheckListener(NULL), + m_checkThreadPoolMinSize(1), + m_checkThreadPoolMaxSize(1), + m_checkRequestHoldMax(2000) + { + + } + + TransactionMQProducer(const std::string& producerGroup) + : DefaultMQProducer(producerGroup), + m_pTransactionCheckListener(NULL), + m_checkThreadPoolMinSize(1), + m_checkThreadPoolMaxSize(1), + m_checkRequestHoldMax(2000) + { + + } + + void start() + { + m_pDefaultMQProducerImpl->initTransactionEnv(); + DefaultMQProducer::start(); + } + + void shutdown() + { + DefaultMQProducer::shutdown(); + m_pDefaultMQProducerImpl->destroyTransactionEnv(); + } + + TransactionSendResult sendMessageInTransaction(const Message& msg, + LocalTransactionExecuter* tranExecuter, void* arg) + { + if (NULL == m_pTransactionCheckListener) + { + THROW_MQEXCEPTION("localTransactionBranchCheckListener is null", -1); + } + + return m_pDefaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); + } + + TransactionCheckListener* getTransactionCheckListener() + { + return m_pTransactionCheckListener; + } + + void setTransactionCheckListener(TransactionCheckListener* pTransactionCheckListener) + { + m_pTransactionCheckListener = pTransactionCheckListener; + } + + int getCheckThreadPoolMinSize() + { + return m_checkThreadPoolMinSize; + } + + void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) + { + m_checkThreadPoolMinSize = checkThreadPoolMinSize; + } + + int getCheckThreadPoolMaxSize() + { + return m_checkThreadPoolMaxSize; + } + + void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) + { + m_checkThreadPoolMaxSize = checkThreadPoolMaxSize; + } + + int getCheckRequestHoldMax() + { + return m_checkRequestHoldMax; + } + + void setCheckRequestHoldMax(int checkRequestHoldMax) + { + m_checkRequestHoldMax = checkRequestHoldMax; + } + + private: + TransactionCheckListener* m_pTransactionCheckListener; + int m_checkThreadPoolMinSize; + int m_checkThreadPoolMaxSize; + int m_checkRequestHoldMax; + }; +} + +#endif
