http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientAPIImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.cpp b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp new file mode 100755 index 0000000..fa5a2b9 --- /dev/null +++ b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp @@ -0,0 +1,1323 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include <assert.h> + +#include "MQClientAPIImpl.h" +#include "MQClientException.h" +#include "SocketUtil.h" +#include "UtilAll.h" +#include "TcpRemotingClient.h" +#include "MQProtos.h" +#include "PullResultExt.h" +#include "ConsumerInvokeCallback.h" +#include "NamesrvUtil.h" +#include "VirtualEnvUtil.h" +#include "ClientRemotingProcessor.h" +#include "CommandCustomHeader.h" +#include "TopicList.h" +#include "ProducerInvokeCallback.h" +#include "MessageDecoder.h" +#include "MessageSysFlag.h" +#include "GetConsumerListByGroupResponseBody.h" + + +namespace rmq +{ + + +MQClientAPIImpl::MQClientAPIImpl(ClientConfig& clientConfig, + const RemoteClientConfig& remoteClientConfig, + ClientRemotingProcessor* pClientRemotingProcessor) + : m_pClientRemotingProcessor(pClientRemotingProcessor) +{ + m_pRemotingClient = new TcpRemotingClient(remoteClientConfig); + + m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE_VALUE, m_pClientRemotingProcessor); + m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED_VALUE, m_pClientRemotingProcessor); + m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET_VALUE, m_pClientRemotingProcessor); + m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT_VALUE, m_pClientRemotingProcessor); + m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO_VALUE, m_pClientRemotingProcessor); + m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY_VALUE, m_pClientRemotingProcessor); +} + +MQClientAPIImpl::~MQClientAPIImpl() +{ +} + +std::string MQClientAPIImpl::getProjectGroupPrefix() +{ + return m_projectGroupPrefix; +} + +std::vector<std::string> MQClientAPIImpl::getNameServerAddressList() +{ + return m_pRemotingClient->getNameServerAddressList(); +} + +TcpRemotingClient* MQClientAPIImpl::getRemotingClient() +{ + return m_pRemotingClient; +} + +std::string MQClientAPIImpl::fetchNameServerAddr() +{ + try + { + std::string addrs = m_topAddressing.fetchNSAddr(); + if (!addrs.empty()) + { + if (addrs != m_nameSrvAddr) + { + RMQ_INFO("name server address changed, %s -> %s", + m_nameSrvAddr.c_str(), addrs.c_str()); + updateNameServerAddressList(addrs); + m_nameSrvAddr = addrs; + return m_nameSrvAddr; + } + } + } + catch (...) + { + RMQ_ERROR("fetchNameServerAddr Exception"); + } + + return m_nameSrvAddr; +} + +void MQClientAPIImpl::updateNameServerAddressList(const std::string& addrs) +{ + m_nameSrvAddr = addrs; + std::vector<std::string> av; + UtilAll::Split(av, addrs, ";"); + if (av.size() > 0) + { + m_pRemotingClient->updateNameServerAddressList(av); + } +} + +void MQClientAPIImpl::start() +{ + m_pRemotingClient->start(); + + try + { + std::string localAddress = getLocalAddress(); + m_projectGroupPrefix = getProjectGroupByIp(localAddress, 3000); + } + catch (std::exception e) + { + } +} + +void MQClientAPIImpl::shutdown() +{ + m_pRemotingClient->shutdown(); +} + +void MQClientAPIImpl::createSubscriptionGroup(const std::string& addr, + SubscriptionGroupConfig config, + int timeoutMillis) +{ + //TODO +} + + +void MQClientAPIImpl::createTopic(const std::string& addr, + const std::string& defaultTopic, + TopicConfig topicConfig, + int timeoutMillis) +{ + std::string topicWithProjectGroup = topicConfig.getTopicName(); + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + topicWithProjectGroup = + VirtualEnvUtil::buildWithProjectGroup(topicConfig.getTopicName(), m_projectGroupPrefix); + } + + CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader(); + requestHeader->topic = (topicWithProjectGroup); + requestHeader->defaultTopic = (defaultTopic); + requestHeader->readQueueNums = (topicConfig.getReadQueueNums()); + requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums()); + requestHeader->perm = (topicConfig.getPerm()); + requestHeader->topicFilterType = (topicConfig.getTopicFilterType()); + requestHeader->topicSysFlag = (topicConfig.getTopicSysFlag()); + requestHeader->order = (topicConfig.isOrder()); + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(UPDATE_AND_CREATE_TOPIC_VALUE, requestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + return; + } + default: + break; + } + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "createTopic failed", -1); +} + +SendResult MQClientAPIImpl::sendMessage(const std::string& addr, + const std::string& brokerName, + Message& msg, + SendMessageRequestHeader* pRequestHeader, + int timeoutMillis, + CommunicationMode communicationMode, + SendCallback* pSendCallback) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), m_projectGroupPrefix)); + pRequestHeader->producerGroup = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->producerGroup, + m_projectGroupPrefix)); + pRequestHeader->topic = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, + m_projectGroupPrefix)); + } + + bool sendSmartMsg = true; + RemotingCommandPtr request = NULL; + if (sendSmartMsg) + { + SendMessageRequestHeaderV2* pRequestHeaderV2 = SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(pRequestHeader); + request = RemotingCommand::createRequestCommand(SEND_MESSAGE_V2_VALUE, pRequestHeaderV2); + delete pRequestHeader; + } + else + { + request = RemotingCommand::createRequestCommand(SEND_MESSAGE_VALUE, pRequestHeader); + } + + if (msg.getCompressBody() != NULL) + { + request->setBody((char*)msg.getCompressBody(), msg.getCompressBodyLen(), false); + } + else + { + request->setBody((char*)msg.getBody(), msg.getBodyLen(), false); + } + + SendResult result; + switch (communicationMode) + { + case ONEWAY: + m_pRemotingClient->invokeOneway(addr, request, timeoutMillis); + return result; + case ASYNC: + sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, pSendCallback); + return result; + case SYNC: + { + SendResult* r = sendMessageSync(addr, brokerName, msg, timeoutMillis, request); + if (r) + { + result = *r; + delete r; + } + return result; + } + default: + break; + } + return result; +} + +PullResult* MQClientAPIImpl::pullMessage(const std::string& addr, + PullMessageRequestHeader* pRequestHeader, + int timeoutMillis, + CommunicationMode communicationMode, + PullCallback* pPullCallback) +{ + + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestHeader->consumerGroup = (VirtualEnvUtil::buildWithProjectGroup( + pRequestHeader->consumerGroup, m_projectGroupPrefix)); + pRequestHeader->topic = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, + m_projectGroupPrefix)); + } + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(PULL_MESSAGE_VALUE, pRequestHeader); + + PullResult* result = NULL; + switch (communicationMode) + { + case ONEWAY: + break; + case ASYNC: + pullMessageAsync(addr, request, timeoutMillis, pPullCallback); + break; + case SYNC: + result = pullMessageSync(addr, request, timeoutMillis); + break; + default: + assert(false); + break; + } + + return result; +} + +MessageExt* MQClientAPIImpl::viewMessage(const std::string& addr, long long phyoffset, int timeoutMillis) +{ + ViewMessageRequestHeader* requestHeader = new ViewMessageRequestHeader(); + requestHeader->offset = phyoffset; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(VIEW_MESSAGE_BY_ID_VALUE, requestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + if (response->getBody() != NULL) + { + int len = 0; + MessageExt* messageExt = MessageDecoder::decode((char*)response->getBody(), + response->getBodyLen(), len); + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + messageExt->setTopic(VirtualEnvUtil::clearProjectGroup(messageExt->getTopic(), + m_projectGroupPrefix)); + } + return messageExt; + } + } + default: + break; + } + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "viewMessage failed", -1); +} + +long long MQClientAPIImpl::searchOffset(const std::string& addr, + const std::string& topic, + int queueId, + long long timestamp, + int timeoutMillis) +{ + std::string topicWithProjectGroup = topic; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); + } + + SearchOffsetRequestHeader* pRequestHeader = new SearchOffsetRequestHeader(); + pRequestHeader->topic = topicWithProjectGroup; + pRequestHeader->queueId = queueId; + pRequestHeader->timestamp = timestamp; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(SEARCH_OFFSET_BY_TIMESTAMP_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + SearchOffsetResponseHeader* ret = (SearchOffsetResponseHeader*)response->getCommandCustomHeader(); + return ret->offset; + } + default: + break; + } + //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + //THROW_MQEXCEPTION(MQClientException, "searchOffset failed", -1); + return -1; +} + +long long MQClientAPIImpl::getMaxOffset(const std::string& addr, + const std::string& topic, + int queueId, + int timeoutMillis) +{ + std::string topicWithProjectGroup = topic; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); + } + + GetMaxOffsetRequestHeader* pRequestHeader = new GetMaxOffsetRequestHeader(); + pRequestHeader->topic = topicWithProjectGroup; + pRequestHeader->queueId = queueId; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(GET_MAX_OFFSET_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + GetMaxOffsetResponseHeader* ret = (GetMaxOffsetResponseHeader*)response->getCommandCustomHeader(); + return ret->offset; + } + default: + break; + } + //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + //THROW_MQEXCEPTION(MQClientException, "getMaxOffset failed", -1); + return -1; +} + + +std::list<std::string> MQClientAPIImpl::getConsumerIdListByGroup(const std::string& addr, + const std::string& consumerGroup, + int timeoutMillis) +{ + std::string consumerGroupWithProjectGroup = consumerGroup; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + consumerGroupWithProjectGroup = + VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix); + } + + GetConsumerListByGroupRequestHeader* requestHeader = new GetConsumerListByGroupRequestHeader(); + requestHeader->consumerGroup = consumerGroupWithProjectGroup; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(GET_CONSUMER_LIST_BY_GROUP_VALUE, requestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + if (response->getBody() != NULL) + { + GetConsumerListByGroupResponseBody* body = + GetConsumerListByGroupResponseBody::decode((char*)response->getBody(), response->getBodyLen()); + std::list<std::string> ret = body->getConsumerIdList(); + delete body; + return ret; + } + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "getConsumerIdListByGroup failed", -1); +} + +long long MQClientAPIImpl::getMinOffset(const std::string& addr, + const std::string& topic, + int queueId, + int timeoutMillis) +{ + std::string topicWithProjectGroup = topic; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); + } + + GetMinOffsetRequestHeader* pRequestHeader = new GetMinOffsetRequestHeader(); + pRequestHeader->topic = topicWithProjectGroup; + pRequestHeader->queueId = queueId; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(GET_MIN_OFFSET_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + GetMinOffsetResponseHeader* ret = (GetMinOffsetResponseHeader*)response->getCommandCustomHeader(); + return ret->offset; + } + default: + break; + } + //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + //THROW_MQEXCEPTION(MQClientException, "getMinOffset failed", -1); + return -1; +} + +long long MQClientAPIImpl::getEarliestMsgStoretime(const std::string& addr, + const std::string& topic, + int queueId, + int timeoutMillis) +{ + std::string topicWithProjectGroup = topic; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); + } + + GetEarliestMsgStoretimeRequestHeader* pRequestHeader = new GetEarliestMsgStoretimeRequestHeader(); + pRequestHeader->topic = topicWithProjectGroup; + pRequestHeader->queueId = queueId; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(GET_EARLIEST_MSG_STORETIME_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + GetEarliestMsgStoretimeResponseHeader* ret = (GetEarliestMsgStoretimeResponseHeader*)response->getCommandCustomHeader(); + return ret->timestamp; + } + default: + break; + } + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "getEarliestMsgStoretime failed", -1); +} + +long long MQClientAPIImpl::queryConsumerOffset(const std::string& addr, + QueryConsumerOffsetRequestHeader* pRequestHeader, + int timeoutMillis) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup( + pRequestHeader->consumerGroup, m_projectGroupPrefix); + pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, + m_projectGroupPrefix); + } + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(QUERY_CONSUMER_OFFSET_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + QueryConsumerOffsetResponseHeader* ret = (QueryConsumerOffsetResponseHeader*)response->getCommandCustomHeader(); + long long offset = ret->offset; + return offset; + } + default: + break; + } + THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "queryConsumerOffset failed", -1); + return -1; +} + +void MQClientAPIImpl::updateConsumerOffset(const std::string& addr, + UpdateConsumerOffsetRequestHeader* pRequestHeader, + int timeoutMillis) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup( + pRequestHeader->consumerGroup, m_projectGroupPrefix); + pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup( + pRequestHeader->topic, m_projectGroupPrefix); + } + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + return; + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "updateConsumerOffset failed", -1); +} + +void MQClientAPIImpl::updateConsumerOffsetOneway(const std::string& addr, + UpdateConsumerOffsetRequestHeader* pRequestHeader, + int timeoutMillis) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup( + pRequestHeader->consumerGroup, m_projectGroupPrefix); + pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, + m_projectGroupPrefix); + } + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, pRequestHeader); + + m_pRemotingClient->invokeOneway(addr, request, timeoutMillis); +} + +void MQClientAPIImpl::sendHearbeat(const std::string& addr, HeartbeatData* pHeartbeatData, int timeoutMillis) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + std::set<ConsumerData>& consumerDatas = pHeartbeatData->getConsumerDataSet(); + std::set<ConsumerData>::iterator it = consumerDatas.begin(); + for (; it != consumerDatas.end(); it++) + { + ConsumerData& consumerData = (ConsumerData&)(*it); + consumerData.groupName = VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName, + m_projectGroupPrefix); + + std::set<SubscriptionData>& subscriptionDatas = consumerData.subscriptionDataSet; + std::set<SubscriptionData>::iterator itsub = subscriptionDatas.begin(); + for (; itsub != subscriptionDatas.end(); itsub++) + { + SubscriptionData& subscriptionData = (SubscriptionData&)(*itsub); + subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup( + subscriptionData.getTopic(), m_projectGroupPrefix)); + } + } + + std::set<ProducerData>& producerDatas = pHeartbeatData->getProducerDataSet(); + std::set<ProducerData>::iterator itp = producerDatas.begin(); + for (; itp != producerDatas.end(); itp++) + { + ProducerData& producerData = (ProducerData&)(*itp); + producerData.groupName = VirtualEnvUtil::buildWithProjectGroup(producerData.groupName, + m_projectGroupPrefix); + } + } + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL); + + std::string body; + pHeartbeatData->encode(body); + request->setBody((char*)body.data(), body.length(), true); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + return; + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "sendHearbeat failed", -1); +} + +void MQClientAPIImpl::unregisterClient(const std::string& addr, + const std::string& clientID, + const std::string& producerGroup, + const std::string& consumerGroup, + int timeoutMillis) +{ + std::string producerGroupWithProjectGroup = producerGroup; + std::string consumerGroupWithProjectGroup = consumerGroup; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + producerGroupWithProjectGroup = + VirtualEnvUtil::buildWithProjectGroup(producerGroup, m_projectGroupPrefix); + consumerGroupWithProjectGroup = + VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix); + } + + UnregisterClientRequestHeader* requestHeader = new UnregisterClientRequestHeader(); + requestHeader->clientID = (clientID); + requestHeader->producerGroup = (producerGroupWithProjectGroup); + requestHeader->consumerGroup = (consumerGroupWithProjectGroup); + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(UNREGISTER_CLIENT_VALUE, requestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + return; + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "unregisterClient failed", -1); +} + +void MQClientAPIImpl::endTransactionOneway(const std::string& addr, + EndTransactionRequestHeader* pRequestHeader, + const std::string& remark, + int timeoutMillis) +{ + //TODO +} + +void MQClientAPIImpl::queryMessage(const std::string& addr, + QueryMessageRequestHeader* pRequestHeader, + int timeoutMillis, + InvokeCallback* pInvokeCallback) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic, + m_projectGroupPrefix); + } + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(QUERY_MESSAGE_VALUE, pRequestHeader); + + m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, pInvokeCallback); + return; +} + +bool MQClientAPIImpl::registerClient(const std::string& addr, HeartbeatData& heartbeat, int timeoutMillis) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + std::set<ConsumerData>& consumerDatas = heartbeat.getConsumerDataSet(); + std::set<ConsumerData>::iterator it = consumerDatas.begin(); + + for (; it != consumerDatas.end(); it++) + { + ConsumerData& consumerData = (ConsumerData&)(*it); + + consumerData.groupName = VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName, + m_projectGroupPrefix); + std::set<SubscriptionData>& subscriptionDatas = consumerData.subscriptionDataSet; + std::set<SubscriptionData>::iterator itsub = subscriptionDatas.begin(); + + for (; itsub != subscriptionDatas.end(); itsub++) + { + SubscriptionData& subscriptionData = (SubscriptionData&)(*itsub); + subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup( + subscriptionData.getTopic(), m_projectGroupPrefix)); + } + } + + std::set<ProducerData>& producerDatas = heartbeat.getProducerDataSet(); + std::set<ProducerData>::iterator itp = producerDatas.begin(); + for (; itp != producerDatas.end(); itp++) + { + ProducerData& producerData = (ProducerData&)(*itp); + producerData.groupName = VirtualEnvUtil::buildWithProjectGroup(producerData.groupName, + m_projectGroupPrefix); + } + } + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL); + + std::string body; + heartbeat.encode(body); + + request->setBody((char*)body.data(), body.length(), true); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + return (response && response->getCode() == SUCCESS_VALUE); +} + +void MQClientAPIImpl::consumerSendMessageBack( + const std::string& addr, + MessageExt& msg, + const std::string& consumerGroup, + int delayLevel, + int timeoutMillis) +{ + std::string consumerGroupWithProjectGroup = consumerGroup; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + consumerGroupWithProjectGroup = + VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix); + msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), m_projectGroupPrefix)); + } + + ConsumerSendMsgBackRequestHeader* requestHeader = new ConsumerSendMsgBackRequestHeader(); + requestHeader->group = consumerGroupWithProjectGroup; + requestHeader->offset = msg.getCommitLogOffset(); + requestHeader->delayLevel = delayLevel; + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(CONSUMER_SEND_MSG_BACK_VALUE, requestHeader); + + std::string brokerAddr = addr.empty() ? socketAddress2IPPort(msg.getStoreHost()) : addr; + RemotingCommandPtr response = m_pRemotingClient->invokeSync(brokerAddr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + return; + break; + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "consumerSendMessageBack failed", -1); +} + +std::set<MessageQueue> MQClientAPIImpl::lockBatchMQ(const std::string& addr, + LockBatchRequestBody* pRequestBody, + int timeoutMillis) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup( + pRequestBody->getConsumerGroup(), m_projectGroupPrefix))); + std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet(); + std::set<MessageQueue>::iterator it = messageQueues.begin(); + + for (; it != messageQueues.end(); it++) + { + MessageQueue& messageQueue = (MessageQueue&)(*it); + messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(), + m_projectGroupPrefix)); + } + } + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(LOCK_BATCH_MQ_VALUE, NULL); + + std::string body; + pRequestBody->encode(body); + request->setBody((char*)body.data(), body.length(), true); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + LockBatchResponseBody* responseBody = + LockBatchResponseBody::decode(response->getBody(), response->getBodyLen()); + std::set<MessageQueue> messageQueues = responseBody->getLockOKMQSet(); + + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + std::set<MessageQueue>::iterator it = messageQueues.begin(); + + for (; it != messageQueues.end(); it++) + { + MessageQueue& messageQueue = (MessageQueue&)(*it); + messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(messageQueue.getTopic(), + m_projectGroupPrefix)); + } + } + return messageQueues; + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "lockBatchMQ failed", -1); +} + +void MQClientAPIImpl::unlockBatchMQ(const std::string& addr, + UnlockBatchRequestBody* pRequestBody, + int timeoutMillis, + bool oneway) +{ + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup( + pRequestBody->getConsumerGroup(), m_projectGroupPrefix))); + std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet(); + std::set<MessageQueue>::iterator it = messageQueues.begin(); + + for (; it != messageQueues.end(); it++) + { + MessageQueue& messageQueue = (MessageQueue&)(*it); + messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(), + m_projectGroupPrefix)); + } + } + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(UNLOCK_BATCH_MQ_VALUE, NULL); + + std::string body; + pRequestBody->encode(body); + request->setBody((char*)body.data(), body.length(), true); + + if (oneway) + { + m_pRemotingClient->invokeOneway(addr, request, timeoutMillis); + } + else + { + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + return; + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "unlockBatchMQ failed", -1); + } +} + +TopicStatsTable MQClientAPIImpl::getTopicStatsInfo(const std::string& addr, + const std::string& topic, + int timeoutMillis) +{ + //TODO + TopicStatsTable t; + return t; +} + +ConsumeStats MQClientAPIImpl::getConsumeStats(const std::string& addr, + const std::string& consumerGroup, + int timeoutMillis) +{ + //TODO + ConsumeStats cs; + return cs; +} + +ProducerConnection* MQClientAPIImpl::getProducerConnectionList(const std::string& addr, + const std::string& producerGroup, + int timeoutMillis) +{ + //TODO + return NULL; +} + +ConsumerConnection* MQClientAPIImpl::getConsumerConnectionList(const std::string& addr, + const std::string& consumerGroup, + int timeoutMillis) +{ + //TODO + return NULL; +} + +KVTable MQClientAPIImpl::getBrokerRuntimeInfo(const std::string& addr, int timeoutMillis) +{ + //TODO + KVTable kv; + return kv; +} + +void MQClientAPIImpl::updateBrokerConfig(const std::string& addr, + const std::map<std::string, std::string>& properties, + int timeoutMillis) +{ + //TODO +} + +ClusterInfo* MQClientAPIImpl::getBrokerClusterInfo(int timeoutMillis) +{ + //TODO + return NULL; +} + +TopicRouteData* MQClientAPIImpl::getDefaultTopicRouteInfoFromNameServer(const std::string& topic, + int timeoutMillis) +{ + GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader(); + requestHeader->topic = topic; + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, requestHeader); + RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case TOPIC_NOT_EXIST_VALUE: + { + // TODO LOG + break; + } + case SUCCESS_VALUE: + { + int bodyLen = response->getBodyLen(); + const char* body = response->getBody(); + if (body) + { + TopicRouteData* ret = TopicRouteData::encode(body, bodyLen); + return ret; + } + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + return NULL; +} + +TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis) +{ + std::string topicWithProjectGroup = topic; + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix); + } + + GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader(); + requestHeader->topic = topicWithProjectGroup; + + RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, requestHeader); + RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case TOPIC_NOT_EXIST_VALUE: + { + if (topic != MixAll::DEFAULT_TOPIC) + { + RMQ_WARN("get Topic [{%s}] RouteInfoFromNameServer is not exist value", topic.c_str()); + } + break; + } + case SUCCESS_VALUE: + { + int bodyLen = response->getBodyLen(); + const char* body = response->getBody(); + if (body) + { + TopicRouteData* ret = TopicRouteData::encode(body, bodyLen); + return ret; + } + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + return NULL; +} + +TopicList* MQClientAPIImpl::getTopicListFromNameServer(int timeoutMillis) +{ + RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE, NULL); + RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + char* body = (char*)response->getBody(); + if (body != NULL) + { + TopicList* topicList = TopicList::decode(body, response->getBodyLen()); + + if (!UtilAll::isBlank(m_projectGroupPrefix)) + { + std::set<std::string> newTopicSet; + + const std::set<std::string>& topics = topicList->getTopicList(); + std::set<std::string>::const_iterator it = topics.begin(); + for (; it != topics.end(); it++) + { + std::string topic = *it; + newTopicSet.insert(VirtualEnvUtil::clearProjectGroup(topic, m_projectGroupPrefix)); + } + + topicList->setTopicList(newTopicSet); + } + + return topicList; + } + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + return NULL; +} + +int MQClientAPIImpl::wipeWritePermOfBroker(const std::string& namesrvAddr, + const std::string& brokerName, + int timeoutMillis) +{ + //TODO + return 0; +} + +void MQClientAPIImpl::deleteTopicInBroker(const std::string& addr, + const std::string& topic, + int timeoutMillis) +{ + //TODO +} + +void MQClientAPIImpl::deleteTopicInNameServer(const std::string& addr, + const std::string& topic, + int timeoutMillis) +{ + //TODO +} + +void MQClientAPIImpl::deleteSubscriptionGroup(const std::string& addr, + const std::string& groupName, + int timeoutMillis) +{ + //TODO +} + +std::string MQClientAPIImpl::getKVConfigValue(const std::string& projectNamespace, + const std::string& key, + int timeoutMillis) +{ + GetKVConfigRequestHeader* pRequestHeader = new GetKVConfigRequestHeader(); + pRequestHeader->namespace_ = projectNamespace; + pRequestHeader->key = key; + + RemotingCommandPtr request = + RemotingCommand::createRequestCommand(GET_KV_CONFIG_VALUE, pRequestHeader); + + RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis); + if (response) + { + switch (response->getCode()) + { + case SUCCESS_VALUE: + { + GetKVConfigResponseHeader* ret = (GetKVConfigResponseHeader*)response->getCommandCustomHeader(); + return ret->value; + } + default: + break; + } + THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode()); + } + + THROW_MQEXCEPTION(MQClientException, "getKVConfigValue failed", -1); +} + +void MQClientAPIImpl::putKVConfigValue(const std::string& projectNamespace, + const std::string& key, + const std::string& value, + int timeoutMillis) +{ + //TODO +} + +void MQClientAPIImpl::deleteKVConfigValue(const std::string& projectNamespace, + const std::string& key, + int timeoutMillis) +{ + //TODO +} + +std::string MQClientAPIImpl::getProjectGroupByIp(const std::string& ip, int timeoutMillis) +{ + return getKVConfigValue(NamesrvUtil::NAMESPACE_PROJECT_CONFIG, ip, timeoutMillis); +} + +std::string MQClientAPIImpl::getKVConfigByValue(const std::string& projectNamespace, + const std::string& projectGroup, + int timeoutMillis) +{ + //TODO + return ""; +} + +KVTable MQClientAPIImpl::getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis) +{ + //TODO + return KVTable(); +} + +void MQClientAPIImpl::deleteKVConfigByValue(const std::string& projectNamespace, + const std::string& projectGroup, + int timeoutMillis) +{ + //TODO +} + +SendResult* MQClientAPIImpl::sendMessageSync(const std::string& addr, + const std::string& brokerName, + Message& msg, + int timeoutMillis, + RemotingCommand* request) +{ + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis); + return processSendResponse(brokerName, msg.getTopic(), response); +} + +void MQClientAPIImpl::sendMessageAsync(const std::string& addr, + const std::string& brokerName, + Message& msg, + int timeoutMillis, + RemotingCommand* request, + SendCallback* pSendCallback) +{ + ProducerInvokeCallback* callback = new ProducerInvokeCallback(pSendCallback, this, msg.getTopic(), brokerName); + m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, callback); +} + +SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName, + const std::string& topic, + RemotingCommand* pResponse) +{ + if (pResponse == NULL) + { + return NULL; + } + + switch (pResponse->getCode()) + { + case FLUSH_DISK_TIMEOUT_VALUE: + case FLUSH_SLAVE_TIMEOUT_VALUE: + case SLAVE_NOT_AVAILABLE_VALUE: + { + // TODO LOG + } + case SUCCESS_VALUE: + { + SendStatus sendStatus = SEND_OK; + switch (pResponse->getCode()) + { + case FLUSH_DISK_TIMEOUT_VALUE: + sendStatus = FLUSH_DISK_TIMEOUT; + break; + case FLUSH_SLAVE_TIMEOUT_VALUE: + sendStatus = FLUSH_SLAVE_TIMEOUT; + break; + case SLAVE_NOT_AVAILABLE_VALUE: + sendStatus = SLAVE_NOT_AVAILABLE; + break; + case SUCCESS_VALUE: + sendStatus = SEND_OK; + break; + default: + //assert false; + break; + } + + SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandCustomHeader(); + MessageQueue messageQueue(topic, brokerName, responseHeader->queueId); + SendResult* ret = new SendResult(sendStatus, responseHeader->msgId, messageQueue, + responseHeader->queueOffset, m_projectGroupPrefix); + + return ret; + } + default: + break; + } + + THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(), pResponse->getCode()); +} + +void MQClientAPIImpl::pullMessageAsync(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis, + PullCallback* pPullCallback) +{ + ConsumerInvokeCallback* callback = new ConsumerInvokeCallback(pPullCallback, this); + m_pRemotingClient->invokeAsync(addr, pRequest, timeoutMillis, callback); +} + +PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse) +{ + PullStatus pullStatus = NO_NEW_MSG; + switch (pResponse->getCode()) + { + case SUCCESS_VALUE: + pullStatus = FOUND; + break; + case PULL_NOT_FOUND_VALUE: + pullStatus = NO_NEW_MSG; + break; + case PULL_RETRY_IMMEDIATELY_VALUE: + pullStatus = NO_MATCHED_MSG; + break; + case PULL_OFFSET_MOVED_VALUE: + pullStatus = OFFSET_ILLEGAL; + break; + default: + THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode()); + break; + } + + PullMessageResponseHeader* responseHeader = (PullMessageResponseHeader*) pResponse->getCommandCustomHeader(); + std::list<MessageExt*> msgFoundList; + return new PullResultExt(pullStatus, responseHeader->nextBeginOffset, + responseHeader->minOffset, responseHeader->maxOffset, msgFoundList, + responseHeader->suggestWhichBrokerId, pResponse->getBody(), pResponse->getBodyLen()); +} + +PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis) +{ + RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, pRequest, timeoutMillis); + PullResult* result = processPullResponse(response); + + response->setBody(NULL, 0, false); + return result; +} + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientAPIImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.h b/rocketmq-client4cpp/src/MQClientAPIImpl.h new file mode 100755 index 0000000..88defb5 --- /dev/null +++ b/rocketmq-client4cpp/src/MQClientAPIImpl.h @@ -0,0 +1,280 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MQCLIENTAPIIMPL_H__ +#define __MQCLIENTAPIIMPL_H__ + +#include <string> +#include <map> +#include <list> +#include <set> + +#include "ClientConfig.h" +#include "RemoteClientConfig.h" +#include "SubscriptionGroupConfig.h" +#include "TopicConfig.h" +#include "ConsumeStats.h" +#include "TopicStatsTable.h" +#include "KVTable.h" +#include "TopicRouteData.h" +#include "SendResult.h" +#include "PullResult.h" +#include "MessageExt.h" +#include "CommunicationMode.h" +#include "TopAddressing.h" +#include "HeartbeatData.h" +#include "LockBatchBody.h" + +namespace rmq +{ +class ClientConfig; +class TcpRemotingClient; +class QueryConsumerOffsetRequestHeader; +class UpdateConsumerOffsetRequestHeader; +class EndTransactionRequestHeader; +class SendMessageRequestHeader; +class PullMessageRequestHeader; +class QueryMessageRequestHeader; +class ProducerConnection; +class ConsumerConnection; +class ClusterInfo; +class TopicList; +class InvokeCallback; +class RemotingCommand; +class PullCallback; +class SendCallback; +class ClientRemotingProcessor; + +class MQClientAPIImpl +{ + public: + MQClientAPIImpl(ClientConfig& clientConfig, + const RemoteClientConfig& remoteClientConfig, + ClientRemotingProcessor* pClientRemotingProcessor); + ~MQClientAPIImpl(); + + void start(); + void shutdown(); + + std::string getProjectGroupPrefix(); + std::vector<std::string> getNameServerAddressList(); + void updateNameServerAddressList(const std::string& addrs); + std::string fetchNameServerAddr(); + + void createSubscriptionGroup(const std::string& addr, + SubscriptionGroupConfig config, + int timeoutMillis); + + void createTopic(const std::string& addr, + const std::string& defaultTopic, + TopicConfig topicConfig, + int timeoutMillis); + + SendResult sendMessage(const std::string& addr, + const std::string& brokerName, + Message& msg, + SendMessageRequestHeader* pRequestHeader, + int timeoutMillis, + CommunicationMode communicationMode, + SendCallback* pSendCallback); + + PullResult* pullMessage(const std::string& addr, + PullMessageRequestHeader* pRequestHeader, + int timeoutMillis, + CommunicationMode communicationMode, + PullCallback* pPullCallback); + + MessageExt* viewMessage(const std::string& addr, long long phyoffset, int timeoutMillis); + + + long long searchOffset(const std::string& addr, + const std::string& topic, + int queueId, + long long timestamp, + int timeoutMillis); + + long long getMaxOffset(const std::string& addr, + const std::string& topic, + int queueId, + int timeoutMillis); + + std::list<std::string> getConsumerIdListByGroup(const std::string& addr, + const std::string& consumerGroup, + int timeoutMillis); + + long long getMinOffset(const std::string& addr, + const std::string& topic, + int queueId, + int timeoutMillis); + + long long getEarliestMsgStoretime(const std::string& addr, + const std::string& topic, + int queueId, + int timeoutMillis); + + long long queryConsumerOffset(const std::string& addr, + QueryConsumerOffsetRequestHeader* pRequestHeader, + int timeoutMillis); + + void updateConsumerOffset(const std::string& addr, + UpdateConsumerOffsetRequestHeader* pRequestHeader, + int timeoutMillis); + + void updateConsumerOffsetOneway(const std::string& addr, + UpdateConsumerOffsetRequestHeader* pRequestHeader, + int timeoutMillis); + + void sendHearbeat(const std::string& addr, HeartbeatData* pHeartbeatData, int timeoutMillis); + + void unregisterClient(const std::string& addr, + const std::string& clientID, + const std::string& producerGroup, + const std::string& consumerGroup, + int timeoutMillis); + + void endTransactionOneway(const std::string& addr, + EndTransactionRequestHeader* pRequestHeader, + const std::string& remark, + int timeoutMillis); + + void queryMessage(const std::string& addr, + QueryMessageRequestHeader* pRequestHeader, + int timeoutMillis, + InvokeCallback* pInvokeCallback); + + bool registerClient(const std::string& addr, + HeartbeatData& heartbeat, + int timeoutMillis); + + void consumerSendMessageBack(const std::string& addr, + MessageExt& msg, + const std::string& consumerGroup, + int delayLevel, + int timeoutMillis); + + std::set<MessageQueue> lockBatchMQ(const std::string& addr, + LockBatchRequestBody* pRequestBody, + int timeoutMillis); + + void unlockBatchMQ(const std::string& addr, + UnlockBatchRequestBody* pRequestBody, + int timeoutMillis, + bool oneway); + + TopicStatsTable getTopicStatsInfo(const std::string& addr, + const std::string& topic, + int timeoutMillis); + + ConsumeStats getConsumeStats(const std::string& addr, + const std::string& consumerGroup, + int timeoutMillis); + + ProducerConnection* getProducerConnectionList(const std::string& addr, + const std::string& producerGroup, + int timeoutMillis); + + ConsumerConnection* getConsumerConnectionList(const std::string& addr, + const std::string& consumerGroup, + int timeoutMillis); + + KVTable getBrokerRuntimeInfo(const std::string& addr, int timeoutMillis); + + void updateBrokerConfig(const std::string& addr, + const std::map<std::string, std::string>& properties, + int timeoutMillis); + + ClusterInfo* getBrokerClusterInfo(int timeoutMillis); + + TopicRouteData* getDefaultTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis); + + TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis); + + TopicList* getTopicListFromNameServer(int timeoutMillis); + + int wipeWritePermOfBroker(const std::string& namesrvAddr, + const std::string& brokerName, + int timeoutMillis); + + void deleteTopicInBroker(const std::string& addr, const std::string& topic, int timeoutMillis); + void deleteTopicInNameServer(const std::string& addr, const std::string& topic, int timeoutMillis); + void deleteSubscriptionGroup(const std::string& addr, + const std::string& groupName, + int timeoutMillis); + + std::string getKVConfigValue(const std::string& projectNamespace, + const std::string& key, + int timeoutMillis); + + void putKVConfigValue(const std::string& projectNamespace, + const std::string& key, + const std::string& value, + int timeoutMillis); + + void deleteKVConfigValue(const std::string& projectNamespace, const std::string& key, int timeoutMillis); + + std::string getProjectGroupByIp(const std::string& ip, int timeoutMillis); + + std::string getKVConfigByValue(const std::string& projectNamespace, + const std::string& projectGroup, + int timeoutMillis); + + KVTable getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis); + + void deleteKVConfigByValue(const std::string& projectNamespace, + const std::string& projectGroup, + int timeoutMillis); + + TcpRemotingClient* getRemotingClient(); + + SendResult* processSendResponse(const std::string& brokerName, + const std::string& topic, + RemotingCommand* pResponse); + + PullResult* processPullResponse(RemotingCommand* pResponse); + + private: + SendResult* sendMessageSync(const std::string& addr, + const std::string& brokerName, + Message& msg, + int timeoutMillis, + RemotingCommand* request); + + void sendMessageAsync(const std::string& addr, + const std::string& brokerName, + Message& msg, + int timeoutMillis, + RemotingCommand* request, + SendCallback* pSendCallback); + + void pullMessageAsync(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis, + PullCallback* pPullCallback); + + PullResult* pullMessageSync(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis); + + private: + TcpRemotingClient* m_pRemotingClient; + TopAddressing m_topAddressing; + ClientRemotingProcessor* m_pClientRemotingProcessor; + std::string m_nameSrvAddr; + std::string m_projectGroupPrefix; + }; +} + +#endif
