http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp b/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp new file mode 100755 index 0000000..2f58d20 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/RemotingCommand.cpp @@ -0,0 +1,421 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "RemotingCommand.h" + +#include <sstream> +#include <string> +#include <stdlib.h> +#include <string.h> +#include <json/json.h> +#include "SocketUtil.h" +#include "CommandCustomHeader.h" +#include "MQVersion.h" + +namespace rmq +{ + +kpr::AtomicInteger RemotingCommand::s_seqNumber = 0; +volatile int RemotingCommand::s_configVersion = MQVersion::s_CurrentVersion; + +RemotingCommand::RemotingCommand(int code) + : m_code(code), m_language("CPP"), m_version(0), m_opaque(s_seqNumber++), + m_flag(0), m_remark(""), m_pCustomHeader(NULL), + m_dataLen(0), m_pData(NULL), m_bodyLen(0), m_pBody(NULL), m_releaseBody(false) +{ +} + +RemotingCommand::RemotingCommand(int code, + const std::string& language, + int version, + int opaque, + int flag, + const std::string& remark, + CommandCustomHeader* pCustomHeader) + : m_code(code), m_language(language), m_version(version), m_opaque(opaque), + m_flag(flag), m_remark(remark), m_pCustomHeader(pCustomHeader), + m_dataLen(0), m_pData(NULL), m_bodyLen(0), m_pBody(NULL), m_releaseBody(false) +{ + +} + +RemotingCommand::~RemotingCommand() +{ + if (m_pData) + { + delete[] m_pData; + } + + if (m_releaseBody) + { + delete[] m_pBody; + m_bodyLen = 0; + m_pBody = NULL; + } + + // TODO: maybe memleak + if (m_pCustomHeader) + { + delete m_pCustomHeader; + m_pCustomHeader = NULL; + } +} + +void RemotingCommand::encode() +{ + std::string extHeader = "{}"; + if (m_pCustomHeader) + { + m_pCustomHeader->encode(extHeader); + } + + std::stringstream ss; + ss << "{" + << CODE_STRING << m_code << "," + << language_STRING << "\"CPP\"," + << version_STRING << m_version << "," + << opaque_STRING << m_opaque << "," + << flag_STRING << m_flag << "," + << remark_STRING << "\"" << m_remark << "\"," + << extFields_STRING << extHeader + << "}"; + + /* protocol: + * | 4 | 4 | headerlen | bodylen | + * | 1-length | 2-headerlen | 3-headerdata | 4-bodydata | + */ + int headLen = ss.str().size(); + m_dataLen = 8 + headLen + m_bodyLen; + m_pData = new char[m_dataLen]; + + //length = len(2 + 3 + 4) + int tmp = htonl(4 + headLen + m_bodyLen); + memcpy(m_pData, &tmp, 4); + + //headerlength = len(3) + tmp = htonl(headLen); + memcpy(m_pData + 4, &tmp, 4); + + //headerdata + memcpy(m_pData + 8, ss.str().c_str(), headLen); + + //bodydata + if (m_pBody) + { + memcpy(m_pData + 8 + headLen, m_pBody, m_bodyLen); + } + + //RMQ_DEBUG("encode|%s%s", ss.str().c_str(), m_pBody ? std::string(m_pBody, m_bodyLen).c_str() : ""); +} + +std::string RemotingCommand::toString() const +{ + std::string extHeader; + if (m_pCustomHeader) + { + m_pCustomHeader->encode(extHeader); + } + + std::stringstream ss; + ss << "{" + << CODE_STRING << m_code << "," + << language_STRING << "\"CPP\"," + << version_STRING << m_version << "," + << opaque_STRING << m_opaque << "," + << flag_STRING << m_flag << "," + << remark_STRING << "\"" << m_remark << "\""; + if (!extHeader.empty()) + { + ss << "," << extFields_STRING << extHeader; + } + ss << "}"; + + if (m_pBody) + { + ss << "|" << m_bodyLen << "|" << std::string(m_pBody, m_bodyLen); + } + + return ss.str(); +} + + +const char* RemotingCommand::getData() +{ + return m_pData; +} + +int RemotingCommand::getDataLen() +{ + return m_dataLen; +} + +const char* RemotingCommand::getBody() +{ + return m_pBody; +} + +int RemotingCommand::getBodyLen() +{ + return m_bodyLen; +} + +void RemotingCommand::setBody(char* pData, int len, bool copy) +{ + m_releaseBody = copy; + + if (copy) + { + m_pBody = new char[len]; + m_bodyLen = len; + memcpy(m_pBody, pData, len); + } + else + { + m_pBody = pData; + m_bodyLen = len; + } +} + +RemotingCommand* RemotingCommand::decode(const char* pData, int len) +{ + Json::Reader reader; + Json::Value object; + + int headLen; + memcpy(&headLen, pData + 4, 4); + headLen = ntohl(headLen); + + //RMQ_DEBUG("decode[%d,%d,%d]|%s%s", len, headLen, len - 8 - headLen, std::string(pData + 8, headLen).c_str(), + // std::string(pData + 8 + headLen, len - 8 - headLen).c_str()); + + if (!reader.parse(pData + 8, pData + 8 + headLen, object)) + { + RMQ_ERROR("parse header fail, %s", std::string(pData + 8, headLen).c_str()); + return NULL; + } + + int code = object["code"].asInt(); + std::string language = object["language"].asString(); + int version = object["version"].asInt(); + int opaque = object["opaque"].asInt(); + int flag = object["flag"].asInt(); + + Json::Value v = object["remark"]; + std::string remark = ""; + if (!v.isNull()) + { + remark = object["remark"].asString(); + } + + RemotingCommand* cmd = new RemotingCommand(code, + language, + version, + opaque, + flag, + remark, + NULL); + + int bodyLen = len - 8 - headLen; + if (bodyLen > 0) + { + cmd->setBody((char*)(pData + 8 + headLen), bodyLen, true); + } + + return cmd; +} + +CommandCustomHeader* RemotingCommand::makeCustomHeader(int code, const char* pData, int len) +{ + Json::Reader reader; + Json::Value object; + + int headLen; + memcpy(&headLen, pData + 4, 4); + headLen = ntohl(headLen); + + if (!reader.parse(pData + 8, pData + 8 + headLen, object)) + { + RMQ_ERROR("parse header fail, %s", std::string(pData + 8, headLen).c_str()); + return NULL; + } + + if (object.isMember("extFields") && object["extFields"].isObject() && object["extFields"].size() > 0) + { + CommandCustomHeader* pCustomHeader = CommandCustomHeader::decode( + code, object["extFields"], isResponseType()); + if (pCustomHeader == NULL) + { + RMQ_WARN("invalid extFields, %d, %s", code, std::string(pData + 8, headLen).c_str()); + } + + setCommandCustomHeader(pCustomHeader); + return pCustomHeader; + } + + return NULL; +} + + +RemotingCommand* RemotingCommand::createRequestCommand(int code, CommandCustomHeader* pCustomHeader) +{ + RemotingCommand* cmd = new RemotingCommand(code); + cmd->setCommandCustomHeader(pCustomHeader); + setCmdVersion(cmd); + + return cmd; +} + +RemotingCommand* RemotingCommand::createResponseCommand(int code, const std::string& remark) +{ + return createResponseCommand(code, remark, NULL); +} + + +RemotingCommand* RemotingCommand::createResponseCommand(int code, const std::string& remark, + CommandCustomHeader* pCustomHeader) +{ + RemotingCommand* cmd = new RemotingCommand(code); + cmd->markResponseType(); + cmd->setRemark(remark); + setCmdVersion(cmd); + + if (pCustomHeader) + { + cmd->setCommandCustomHeader(pCustomHeader); + } + + return cmd; +} + + +void RemotingCommand::markResponseType() +{ + int bits = 1 << RPC_TYPE; + m_flag |= bits; +} + +bool RemotingCommand::isResponseType() +{ + int bits = 1 << RPC_TYPE; + return (m_flag & bits) == bits; +} + +void RemotingCommand::markOnewayRPC() +{ + int bits = 1 << RPC_ONEWAY; + m_flag |= bits; +} + +bool RemotingCommand::isOnewayRPC() +{ + int bits = 1 << RPC_ONEWAY; + return (m_flag & bits) == bits; +} + +void RemotingCommand::setCmdVersion(RemotingCommand* pCmd) +{ + if (s_configVersion >= 0) + { + pCmd->setVersion(s_configVersion); + } + else + { + int value = MQVersion::s_CurrentVersion; + pCmd->setVersion(value); + s_configVersion = value; + } +} + +int RemotingCommand::getCode() +{ + return m_code; +} + +void RemotingCommand::setCode(int code) +{ + m_code = code; +} + +std::string RemotingCommand::getLanguage() +{ + return m_language; +} + +void RemotingCommand::setLanguage(const std::string& language) +{ + m_language = language; +} + +int RemotingCommand::getVersion() +{ + return m_version; +} + +void RemotingCommand::setVersion(int version) +{ + m_version = version; +} + +int RemotingCommand::getOpaque() +{ + return m_opaque; +} + +void RemotingCommand::setOpaque(int opaque) +{ + m_opaque = opaque; +} + +int RemotingCommand::getFlag() +{ + return m_flag; +} + +void RemotingCommand::setFlag(int flag) +{ + m_flag = flag; +} + +std::string RemotingCommand::getRemark() +{ + return m_remark; +} + +void RemotingCommand::setRemark(const std::string& remark) +{ + m_remark = remark; +} + +void RemotingCommand::setCommandCustomHeader(CommandCustomHeader* pCommandCustomHeader) +{ + m_pCustomHeader = pCommandCustomHeader; +} + +CommandCustomHeader* RemotingCommand::getCommandCustomHeader() +{ + return m_pCustomHeader; +} + +RemotingCommandType RemotingCommand::getType() +{ + if (isResponseType()) + { + return RESPONSE_COMMAND; + } + + return REQUEST_COMMAND; +} + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/RemotingCommand.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/RemotingCommand.h b/rocketmq-client4cpp/src/protocol/RemotingCommand.h new file mode 100755 index 0000000..c51fcfd --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/RemotingCommand.h @@ -0,0 +1,153 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __REMOTINGCOMMAND_H__ +#define __REMOTINGCOMMAND_H__ + +#include <sstream> +#include <string> + +#include "RocketMQClient.h" +#include "AtomicValue.h" +#include "RefHandle.h" + +namespace rmq +{ + const std::string CODE_STRING = "\"code\":"; + const std::string language_STRING = "\"language\":"; + const std::string version_STRING = "\"version\":"; + const std::string opaque_STRING = "\"opaque\":"; + const std::string flag_STRING = "\"flag\":"; + const std::string remark_STRING = "\"remark\":"; + const std::string extFields_STRING = "\"extFields\":"; + + const std::string RemotingVersionKey = "rocketmq.remoting.version"; + + class CommandCustomHeader; + + typedef enum + { + REQUEST_COMMAND, + RESPONSE_COMMAND + } RemotingCommandType; + + typedef enum + { + SUCCESS_VALUE = 0, + SYSTEM_ERROR_VALUE, + SYSTEM_BUSY_VALUE, + REQUEST_CODE_NOT_SUPPORTED_VALUE, + } ResponseCode; + + typedef enum + { + JAVA, + CPP, + DOTNET, + PYTHON, + DELPHI, + ERLANG, + RUBY, + OTHER, + } LanguageCode; + + const int RPC_TYPE = 0; // 0, REQUEST_COMMAND // 1, RESPONSE_COMMAND + const int RPC_ONEWAY = 1; // 0, RPC // 1, Oneway + + class RemotingCommand : public kpr::RefCount + { + public: + RemotingCommand(int code); + RemotingCommand(int code, + const std::string& language, + int version, + int opaque, + int flag, + const std::string& remark, + CommandCustomHeader* pCustomHeader); + ~RemotingCommand(); + + void encode(); + std::string toString() const; + + const char* getData(); + int getDataLen(); + + const char* getBody(); + int getBodyLen(); + void setBody(char* pData, int len, bool copy); + CommandCustomHeader* makeCustomHeader(int code, const char* pData, int len); + + int getCode(); + void setCode(int code); + + std::string getLanguage(); + void setLanguage(const std::string& language); + + int getVersion(); + void setVersion(int version); + + int getOpaque(); + void setOpaque(int opaque); + + int getFlag(); + void setFlag(int flag); + + std::string getRemark(); + void setRemark(const std::string& remark); + + void setCommandCustomHeader(CommandCustomHeader* pCommandCustomHeader); + CommandCustomHeader* getCommandCustomHeader(); + + RemotingCommandType getType(); + void markResponseType(); + bool isResponseType() ; + void markOnewayRPC(); + bool isOnewayRPC(); + + static void setCmdVersion(RemotingCommand* pCmd); + static RemotingCommand* decode(const char* pData, int len); + static RemotingCommand* createRequestCommand(int code, CommandCustomHeader* pCustomHeader); + static RemotingCommand* createResponseCommand(int code, const std::string& remark); + static RemotingCommand* createResponseCommand(int code, const std::string& remark, CommandCustomHeader* pCustomHeader); + + + private: + static volatile int s_configVersion; + + private: + int m_code; + std::string m_language; + int m_version; + int m_opaque; + int m_flag; + std::string m_remark; + CommandCustomHeader* m_pCustomHeader; + + int m_dataLen; + char* m_pData; + + int m_bodyLen; + char* m_pBody; + + bool m_releaseBody; + + static kpr::AtomicInteger s_seqNumber; + }; + typedef kpr::RefHandleT<RemotingCommand> RemotingCommandPtr; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/RemotingSerializable.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/RemotingSerializable.h b/rocketmq-client4cpp/src/protocol/RemotingSerializable.h new file mode 100755 index 0000000..8e50ab0 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/RemotingSerializable.h @@ -0,0 +1,33 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __REMOTINGSERIALIZABLE_H__ +#define __REMOTINGSERIALIZABLE_H__ + +#include "RocketMQClient.h" +#include "RefHandle.h" + +namespace rmq +{ + class RemotingSerializable : public kpr::RefCount + { + public: + virtual ~RemotingSerializable() {}; + virtual void encode(std::string& outData) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/TopicList.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/TopicList.h b/rocketmq-client4cpp/src/protocol/TopicList.h new file mode 100755 index 0000000..e827540 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/TopicList.h @@ -0,0 +1,60 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __TOPICLIST_H__ +#define __TOPICLIST_H__ + +#include <set> +#include <string> +#include <UtilAll.h> + +namespace rmq +{ + class TopicList : public RemotingSerializable + { + public: + static TopicList* decode(const char* pData, int len) + { + return new TopicList(); + } + + void encode(std::string& outData) + { + } + + std::string toString() const + { + std::stringstream ss; + ss << "{topicList=" << UtilAll::toString(m_topicList) + << "}"; + return ss.str(); + } + + const std::set<std::string>& getTopicList() + { + return m_topicList; + } + + void setTopicList(const std::set<std::string>& topicList) + { + m_topicList = topicList; + } + + private: + std::set<std::string> m_topicList; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/TopicRouteData.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/TopicRouteData.h b/rocketmq-client4cpp/src/protocol/TopicRouteData.h new file mode 100755 index 0000000..a40ef7d --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/TopicRouteData.h @@ -0,0 +1,279 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __TOPICROUTEDATA_H__ +#define __TOPICROUTEDATA_H__ + +#include <stdio.h> +#include <stdlib.h> +#include <iostream> +#include <list> +#include <map> +#include <string> +#include <sstream> +#include "RocketMQClient.h" +#include "RemotingSerializable.h" +#include "UtilAll.h" +#include "MixAll.h" +#include "json/json.h" + +namespace rmq +{ + struct QueueData + { + std::string brokerName; + int readQueueNums; + int writeQueueNums; + int perm; + + bool operator < (const QueueData& other) + { + return brokerName < other.brokerName; + } + + bool operator==(const QueueData& other)const + { + if (brokerName == other.brokerName + && readQueueNums == other.readQueueNums + && writeQueueNums == other.writeQueueNums + && perm == other.perm) + { + return true; + } + + return false; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{brokerName=" << brokerName + << ",readQueueNums=" << readQueueNums + << ",writeQueueNums=" << writeQueueNums + << ",perm=" << perm + << "}"; + return ss.str(); + } + }; + inline std::ostream& operator<<(std::ostream& os, const QueueData& obj) + { + os << obj.toString(); + return os; + } + + + struct BrokerData + { + std::string brokerName; + std::map<int, std::string> brokerAddrs; + + bool operator < (const BrokerData& other) + { + return brokerName < other.brokerName; + } + + bool operator == (const BrokerData& other)const + { + if (brokerName == other.brokerName + && brokerAddrs == other.brokerAddrs) + { + return true; + } + + return false; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{brokerName=" << brokerName + << ",brokerAddrs=" << UtilAll::toString(brokerAddrs) + << "}"; + return ss.str(); + } + }; + + + inline std::ostream& operator<<(std::ostream& os, const BrokerData& obj) + { + os << obj.toString(); + return os; + } + + + class TopicRouteData : public RemotingSerializable + { + public: + void encode(std::string& outData) + { + + } + + static TopicRouteData* encode(const char* pData, int len) + { + /* + { + "orderTopicConf":"", + "brokerDatas":[ + {"brokerAddrs":{0:"10.134.143.77:10911"},"brokerName":"broker-a"} + ], + "filterServerTable":{}, + "queueDatas":[ + {"brokerName":"broker-a","perm":6,"readQueueNums":4,"topicSynFlag":0,"writeQueueNums":4} + ] + } + */ + Json::Reader reader; + Json::Value object; + + if (!reader.parse(pData, pData + len, object)) + { + RMQ_ERROR("parse fail:%s", reader.getFormattedErrorMessages().c_str()); + return NULL; + } + + TopicRouteData* trd = new TopicRouteData(); + trd->setOrderTopicConf(object["orderTopicConf"].asString()); + + Json::Value qds = object["queueDatas"]; + for (size_t i = 0; i < qds.size(); i++) + { + QueueData d; + Json::Value qd = qds[i]; + d.brokerName = qd["brokerName"].asString(); + d.readQueueNums = qd["readQueueNums"].asInt(); + d.writeQueueNums = qd["writeQueueNums"].asInt(); + d.perm = qd["perm"].asInt(); + + trd->getQueueDatas().push_back(d); + } + + Json::Value bds = object["brokerDatas"]; + for (size_t i = 0; i < bds.size(); i++) + { + BrokerData d; + Json::Value bd = bds[i]; + d.brokerName = bd["brokerName"].asString(); + + Json::Value bas = bd["brokerAddrs"]; + Json::Value::Members mbs = bas.getMemberNames(); + for (size_t i = 0; i < mbs.size(); i++) + { + std::string key = mbs.at(i); + d.brokerAddrs[atoi(key.c_str())] = bas[key].asString(); + } + + trd->getBrokerDatas().push_back(d); + } + + return trd; + } + + static std::string selectBrokerAddr(BrokerData& data) + { + std::map<int, std::string>::iterator it = data.brokerAddrs.find(MixAll::MASTER_ID); + std::string value = ""; + if (it == data.brokerAddrs.end()) + { + it = data.brokerAddrs.begin(); + if (it != data.brokerAddrs.end()) + { + value = it->second; + } + } + else + { + value = it->second; + } + + return value; + } + + std::list<QueueData>& getQueueDatas() + { + return m_queueDatas; + } + + void setQueueDatas(const std::list<QueueData>& queueDatas) + { + m_queueDatas = queueDatas; + } + + std::list<BrokerData>& getBrokerDatas() + { + return m_brokerDatas; + } + + void setBrokerDatas(const std::list<BrokerData>& brokerDatas) + { + m_brokerDatas = brokerDatas; + } + + const std::string& getOrderTopicConf() + { + return m_orderTopicConf; + } + + void setOrderTopicConf(const std::string& orderTopicConf) + { + m_orderTopicConf = orderTopicConf; + } + + bool operator ==(const TopicRouteData& other) + { + if (m_brokerDatas != other.m_brokerDatas) + { + return false; + } + + if (m_orderTopicConf != other.m_orderTopicConf) + { + return false; + } + + if (m_queueDatas != other.m_queueDatas) + { + return false; + } + + return true; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{orderTopicConf=" << m_orderTopicConf + << ",queueDatas=" << UtilAll::toString(m_queueDatas) + << ",brokerDatas=" << UtilAll::toString(m_brokerDatas) + << "}"; + return ss.str(); + } + + private: + std::string m_orderTopicConf; + std::list<QueueData> m_queueDatas; + std::list<BrokerData> m_brokerDatas; + }; + typedef kpr::RefHandleT<TopicRouteData> TopicRouteDataPtr; + + inline std::ostream& operator<<(std::ostream& os, const TopicRouteData& obj) + { + os << obj.toString(); + return os; + } + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/InvokeCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/InvokeCallback.h b/rocketmq-client4cpp/src/transport/InvokeCallback.h new file mode 100755 index 0000000..4b5b3c7 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/InvokeCallback.h @@ -0,0 +1,32 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __INVOKECALLBACK_H__ +#define __INVOKECALLBACK_H__ + +#include "ResponseFuture.h" + +namespace rmq +{ + class InvokeCallback + { + public: + virtual ~InvokeCallback() {} + virtual void operationComplete(ResponseFuturePtr pResponseFuture) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/RemoteClientConfig.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/RemoteClientConfig.h b/rocketmq-client4cpp/src/transport/RemoteClientConfig.h new file mode 100755 index 0000000..930fc78 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/RemoteClientConfig.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __REMOTECLIENTCONFIG_H__ +#define __REMOTECLIENTCONFIG_H__ + +#include <unistd.h> +#include <sys/sysinfo.h> + +namespace rmq +{ + /** + * remote client config + * + */ + class RemoteClientConfig + { + public: + RemoteClientConfig() + { + clientWorkerThreads = 4; + clientCallbackExecutorThreads = get_nprocs(); + clientSelectorThreads = 1; + clientOnewaySemaphoreValue = 2048; + clientAsyncSemaphoreValue = 2048; + connectTimeoutMillis = 3000; + channelNotActiveInterval = 1000 * 60; + clientChannelMaxIdleTimeSeconds = 120; + clientSocketSndBufSize = 65535; + clientSocketRcvBufSize = 65535; + + nsL5ModId = 0; + nsL5CmdId = 0; + } + + // Server Response/Request + int clientWorkerThreads; + int clientCallbackExecutorThreads; + int clientSelectorThreads; + int clientOnewaySemaphoreValue; + int clientAsyncSemaphoreValue; + int connectTimeoutMillis; + + int channelNotActiveInterval; + int clientChannelMaxIdleTimeSeconds; + int clientSocketSndBufSize; + int clientSocketRcvBufSize; + + int nsL5ModId; + int nsL5CmdId; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/ResponseFuture.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/ResponseFuture.cpp b/rocketmq-client4cpp/src/transport/ResponseFuture.cpp new file mode 100755 index 0000000..c80fb84 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/ResponseFuture.cpp @@ -0,0 +1,183 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "ResponseFuture.h" + +#include "RocketMQClient.h" +#include "KPRUtil.h" +#include "InvokeCallback.h" +#include "Monitor.h" +#include "Semaphore.h" +#include "ScopedLock.h" + +namespace rmq +{ + +ResponseFuture::ResponseFuture(int requestCode, int opaque, int timeoutMillis, + InvokeCallback* pInvokeCallback, bool block, kpr::Semaphore* pSem) +{ + m_requestCode = requestCode; + m_opaque = opaque; + m_timeoutMillis = timeoutMillis; + m_pInvokeCallback = pInvokeCallback; + m_beginTimestamp = KPRUtil::GetCurrentTimeMillis(); + m_pResponseCommand = NULL; + m_notifyFlag = false; + m_pMonitor = NULL; + m_sendRequestOK = false; + m_exec = 0; + + m_pSemaphore = pSem; + m_released = 0; + + if (block) + { + m_pMonitor = new kpr::Monitor(); + } +} + +ResponseFuture::~ResponseFuture() +{ + if (m_pMonitor) + { + delete m_pMonitor; + } +} + +void ResponseFuture::executeInvokeCallback() +{ + if (m_pInvokeCallback != NULL) + { + if (m_exec.compareAndSet(0, 1)) + { + try + { + m_pInvokeCallback->operationComplete(this); + } + catch(std::exception& e) + { + RMQ_ERROR("executeInvokeCallback exception: %s", e.what()); + } + catch(...) + { + RMQ_ERROR("executeInvokeCallback exception"); + } + } + } +} + +void ResponseFuture::release() +{ + if (m_pSemaphore != NULL) + { + if (m_released.compareAndSet(0, 1)) + { + m_pSemaphore->Release(); + } + } +} + +bool ResponseFuture::isTimeout() +{ + long long diff = KPRUtil::GetCurrentTimeMillis() - m_beginTimestamp; + return diff > m_timeoutMillis; +} + +RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) +{ + if (m_pMonitor) + { + kpr::ScopedLock<kpr::Monitor> lock(*m_pMonitor); + if (!m_notifyFlag) + { + m_pMonitor->Wait(timeoutMillis); + } + } + + return m_pResponseCommand; +} + +void ResponseFuture::putResponse(RemotingCommand* pResponseCommand) +{ + m_pResponseCommand = pResponseCommand; + if (m_pMonitor) + { + kpr::ScopedLock<kpr::Monitor> lock(*m_pMonitor); + m_notifyFlag = true; + m_pMonitor->Notify(); + } +} + +long long ResponseFuture::getBeginTimestamp() +{ + return m_beginTimestamp; +} + +bool ResponseFuture::isSendRequestOK() +{ + return m_sendRequestOK; +} + +void ResponseFuture::setSendRequestOK(bool sendRequestOK) +{ + m_sendRequestOK = sendRequestOK; +} + +long long ResponseFuture::getTimeoutMillis() +{ + return m_timeoutMillis; +} + +InvokeCallback* ResponseFuture::getInvokeCallback() +{ + return m_pInvokeCallback; +} + +RemotingCommand* ResponseFuture::getResponseCommand() +{ + return m_pResponseCommand; +} + +void ResponseFuture::setResponseCommand(RemotingCommand* pResponseCommand) +{ + m_pResponseCommand = pResponseCommand; +} + +int ResponseFuture::getOpaque() +{ + return m_opaque; +} + +int ResponseFuture::getRequestCode() +{ + return m_requestCode; +} + +void ResponseFuture::setRequestCode(int requestCode) +{ + m_requestCode = requestCode; +} + +std::string ResponseFuture::toString() const +{ + std::stringstream oss; + oss << "{responseCommand=" << m_pResponseCommand << ",sendRequestOK=" << m_sendRequestOK + << ",opaque=" << m_opaque << ",timeoutMillis=" << m_timeoutMillis + << ",invokeCallback=" << m_pInvokeCallback << ",beginTimestamp=" << m_beginTimestamp + << "}"; + return oss.str(); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/ResponseFuture.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/ResponseFuture.h b/rocketmq-client4cpp/src/transport/ResponseFuture.h new file mode 100755 index 0000000..f1dfc01 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/ResponseFuture.h @@ -0,0 +1,77 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RESPONSEFUTURE_H__ +#define __RESPONSEFUTURE_H__ + +#include <string> +#include "AtomicValue.h" +#include "RefHandle.h" + +namespace kpr +{ +class Monitor; +class Semaphore; +} + +namespace rmq +{ + class InvokeCallback; + class RemotingCommand; + + class ResponseFuture : public kpr::RefCount + { + public: + ResponseFuture(int requestCode, int opaque, int timeoutMillis, InvokeCallback* pInvokeCallback, + bool block, kpr::Semaphore* pSem); + ~ResponseFuture(); + void executeInvokeCallback(); + void release(); + bool isTimeout(); + RemotingCommand* waitResponse(int timeoutMillis); + void putResponse(RemotingCommand* pResponseCommand); + long long getBeginTimestamp(); + bool isSendRequestOK(); + void setSendRequestOK(bool sendRequestOK); + int getRequestCode(); + void setRequestCode(int requestCode); + long long getTimeoutMillis(); + InvokeCallback* getInvokeCallback(); + RemotingCommand* getResponseCommand(); + void setResponseCommand(RemotingCommand* pResponseCommand); + int getOpaque(); + std::string toString() const; + + private: + RemotingCommand* m_pResponseCommand; + volatile bool m_sendRequestOK; + int m_requestCode; + int m_opaque; + long long m_timeoutMillis; + InvokeCallback* m_pInvokeCallback; + long long m_beginTimestamp; + kpr::Monitor* m_pMonitor; + bool m_notifyFlag; + + kpr::AtomicInteger m_exec; + + kpr::Semaphore* m_pSemaphore; + kpr::AtomicInteger m_released; + }; + typedef kpr::RefHandleT<ResponseFuture> ResponseFuturePtr; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/SocketUtil.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/SocketUtil.cpp b/rocketmq-client4cpp/src/transport/SocketUtil.cpp new file mode 100755 index 0000000..a1e0d57 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/SocketUtil.cpp @@ -0,0 +1,250 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "SocketUtil.h" +#include "MixAll.h" +#include "MQClientException.h" + + +namespace rmq +{ + +int SocketInit() +{ + signal(SIGPIPE, SIG_IGN); + + return 0; +} + +int MakeSocketNonblocking(SOCKET fd) +{ + int flags = fcntl(fd, F_GETFL, 0); + assert(flags != -1); + flags = (flags | O_NONBLOCK); + return fcntl(fd, F_SETFL, flags); +} + +int SetTcpNoDelay(SOCKET fd) +{ + int flag = 1; + return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char*)&flag, sizeof(flag)); +} + +bool SplitURL(const std::string& serverURL, std::string& addr, short& nPort) +{ + size_t pos = serverURL.find(':'); + if (pos == std::string::npos) + { + return false; + } + + addr = serverURL.substr(0, pos); + if (0 == addr.compare("localhost")) + { + addr = "127.0.0.1"; + } + pos++; + + std::string port = serverURL.substr(pos, serverURL.length() - pos); + nPort = atoi(port.c_str()); + return true; +} + +sockaddr string2SocketAddress(const std::string& addrString) +{ + std::string strAddr; + short port; + SplitURL(addrString, strAddr, port); + + struct sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + + sa.sin_addr.s_addr = inet_addr(strAddr.c_str()); + + sockaddr addr; + memcpy(&addr, &sa, sizeof(sockaddr)); + + return addr; +} + +std::string socketAddress2String(sockaddr addr) +{ + sockaddr_in in; + memcpy(&in, &addr, sizeof(sockaddr)); + + std::stringstream ss; + ss << inet_ntoa(in.sin_addr) << ":" << in.sin_port; + + return ss.str(); +} + +void GetLocalAddrs(std::vector<unsigned int>& addrs) +{ + addrs.clear(); + + struct ifconf ifc; + ifc.ifc_buf = NULL; + ifc.ifc_len = 0; + + int sfd = socket(AF_INET, SOCK_DGRAM, 0); + if (sfd != INVALID_SOCKET) + { + int ret = ioctl(sfd, SIOCGIFCONF, (char*)&ifc); + + if (ret != -1) + { + ifc.ifc_req = (struct ifreq*)malloc(ifc.ifc_len); + ret = ioctl(sfd, SIOCGIFCONF, (char*)&ifc); + if (ret != -1) + { + for (size_t i = 0; i < ifc.ifc_len / sizeof(struct ifreq); i++) + { + struct sockaddr* sa = (struct sockaddr*) & (ifc.ifc_req[i].ifr_addr); + if (AF_INET == sa->sa_family) + { + unsigned int addr = ((struct sockaddr_in*)sa)->sin_addr.s_addr; + addrs.push_back(htonl(addr)); + } + } + } + + free(ifc.ifc_req); + ifc.ifc_req = NULL; + } + + close(sfd); + } + + if (addrs.empty()) + { + char hostname[1024]; + + int ret = gethostname(hostname, sizeof(hostname)); + if (ret == 0) + { + struct addrinfo* result = NULL; + struct addrinfo* ptr = NULL; + struct addrinfo hints; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + + ret = getaddrinfo(hostname, NULL, &hints, &result); + if (ret == 0) + { + for (ptr = result; ptr != NULL ; ptr = ptr->ai_next) + { + + struct sockaddr_in* sockaddr_ipv4 = (struct sockaddr_in*) ptr->ai_addr; + addrs.push_back(ntohl(sockaddr_ipv4->sin_addr.s_addr)); + } + } + + freeaddrinfo(result); + } + } + + std::vector<unsigned int>::iterator it = addrs.begin(); + for (; it != addrs.end();) + { + if (*it >= 0x7F000000U && *it < 0x80000000U) + { + it = addrs.erase(it); + } + else + { + it++; + } + } + + if (addrs.empty()) + { + addrs.push_back(INADDR_LOOPBACK); + } +} + +std::string getLocalAddress() +{ + std::vector<unsigned int> addrs; + GetLocalAddrs(addrs); + struct in_addr addr; + addr.s_addr = htonl(addrs[0]); + + return inet_ntoa(addr); +} + +std::string getHostName(sockaddr addr) +{ + sockaddr_in in; + memcpy(&in, &addr, sizeof(sockaddr)); + + struct hostent* remoteHost = gethostbyaddr((char*) & (in.sin_addr), 4, AF_INET); + char** alias = remoteHost->h_aliases; + if (*alias != 0) + { + return *alias; + } + else + { + return inet_ntoa(in.sin_addr); + } +} + + +unsigned long long swapll(unsigned long long v) +{ +#ifdef ENDIANMODE_BIG + return v; +#else + unsigned long long ret = ((v << 56) + | ((v & 0xff00) << 40) + | ((v & 0xff0000) << 24) + | ((v & 0xff000000) << 8) + | ((v >> 8) & 0xff000000) + | ((v >> 24) & 0xff0000) + | ((v >> 40) & 0xff00) + | (v >> 56)); + + return ret; +#endif +} + +unsigned long long h2nll(unsigned long long v) +{ + return swapll(v); +} + +unsigned long long n2hll(unsigned long long v) +{ + return swapll(v); +} + +std::string socketAddress2IPPort(sockaddr addr) +{ + sockaddr_in in; + memcpy(&in, &addr, sizeof(sockaddr)); + + char tmp[32]; + snprintf(tmp, sizeof(tmp), "%s:%d", inet_ntoa(in.sin_addr), ntohs(in.sin_port)); + + std::string ipport = tmp; + return ipport; +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/SocketUtil.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/SocketUtil.h b/rocketmq-client4cpp/src/transport/SocketUtil.h new file mode 100755 index 0000000..bfd86d8 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/SocketUtil.h @@ -0,0 +1,75 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __SOCKETUTIL_H__ +#define __SOCKETUTIL_H__ + +#include <unistd.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <sys/select.h> +#include <sys/ioctl.h> +#include <netdb.h> +#include <netinet/in.h> +#include <net/if.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <errno.h> +#include <signal.h> +#include <string> +#include <stdlib.h> +#include <assert.h> +#include <string.h> +#include <sstream> +#include <vector> +#include <iostream> + +#include "RocketMQClient.h" + + +#define NET_ERROR errno +#define SOCKET_ERROR -1 +#define INVALID_SOCKET -1 +#define WSAECONNRESET ECONNRESET +#define WSAEWOULDBLOCK EWOULDBLOCK +#define WSAEINPROGRESS EINPROGRESS +#define WSAEBADF EBADF +#define closesocket close +#define SD_SEND SHUT_WR +#define SD_RECEIVE SHUT_RD +#define SD_BOTH SHUT_RDWR +typedef int SOCKET; +#define SocketUninit() + +namespace rmq +{ + int SocketInit(); + int MakeSocketNonblocking(SOCKET fd); + int SetTcpNoDelay(SOCKET fd); + + bool SplitURL(const std::string& serverURL, std::string& addr, short& nPort); + sockaddr string2SocketAddress(const std::string& addr); + std::string socketAddress2String(sockaddr addr); + std::string socketAddress2IPPort(sockaddr addr); + std::string getHostName(sockaddr addr); + std::string getLocalAddress(); + + unsigned long long h2nll(unsigned long long v); + unsigned long long n2hll(unsigned long long v); +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp b/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp new file mode 100755 index 0000000..03b8ca7 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/TcpRemotingClient.cpp @@ -0,0 +1,841 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); + +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "TcpRemotingClient.h" +#include "MQClientException.h" +#include "TcpRequestProcessor.h" +#include "MQProtos.h" +#include "ThreadPoolWork.h" + +namespace rmq +{ + + +ProcessDataWork::ProcessDataWork(TcpRemotingClient* pClient, TcpTransport* pTts, std::string* pData) + : m_pClient(pClient), m_pTts(pTts), m_pData(pData) +{ +} + +ProcessDataWork::~ProcessDataWork() +{ + delete m_pData; +} + +void ProcessDataWork::Do() +{ + try + { + m_pClient->processData(m_pTts, m_pData); + } + catch (std::exception& e) + { + RMQ_ERROR("processDataWork catch Exception: %s", e.what()); + } + catch (...) + { + RMQ_ERROR("processDataWork catch Exception"); + } +} + +TcpRemotingClient::TcpRemotingClient(const RemoteClientConfig& config) + : m_stop(false), m_epoller(false), m_config(config), + m_semaphoreOneway(s_ClientOnewaySemaphoreValue), m_semaphoreAsync(s_ClientAsyncSemaphoreValue) +{ + m_pNetThreadPool = new kpr::ThreadPool("NetClientThreadPool", 5, 5, 20); + m_pEventThread = new EventThread(*this); + SocketInit(); + m_epoller.create(10240); +} + +TcpRemotingClient::~TcpRemotingClient() +{ + SocketUninit(); +} + +void TcpRemotingClient::start() +{ + RMQ_DEBUG("TcpRemotingClient::start()"); + m_pEventThread->Start(); +} + +void TcpRemotingClient::shutdown() +{ + RMQ_DEBUG("TcpRemotingClient::shutdown()"); + m_stop = true; + m_pNetThreadPool->Destroy(); + m_pEventThread->Join(); +} + +/* +void printMsg(const std::string& prefix, const char* pData, int len) +{ + int headLen; + memcpy(&headLen, pData + 4, 4); + headLen = ntohl(headLen); + + RMQ_DEBUG("%s|decode[%d,%d,%d]|%s%s", prefix.c_str(), len, headLen, len - 8 - headLen, std::string(pData + 8, headLen).c_str(), + std::string(pData + 8 + headLen, len - 8 - headLen).c_str()); +} +*/ + +void TcpRemotingClient::run() +{ + RMQ_INFO("EventThread run begin: %lld", KPRUtil::GetCurrentTimeMillis()); + do + { + try + { + int nfds = m_epoller.wait(500); + if (nfds > 0) + { + int ret = 0; + std::vector<TcpTransport*> errTts; + for (int i = 0; i < nfds && !m_stop; ++i) + { + const epoll_event& ev = m_epoller.get(i); + std::map<std::string , TcpTransport*>::iterator it; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_transportTableLock); + it = m_transportTable.find((char*)ev.data.ptr); + if (it == m_transportTable.end()) + { + continue; + } + } + + TcpTransport* pTts = it->second; + if (ev.events & EPOLLERR || ev.events & EPOLLHUP) + { + RMQ_ERROR("recvData fail, err=%d(%s), pts=%p", errno, strerror(errno), pTts); + errTts.push_back(pTts); + } + + if (ev.events & EPOLLIN) + { + std::list<std::string*> dataList; + ret = pTts->recvData(dataList); + if (ret < 0) + { + RMQ_ERROR("recvData fail, ret=%d, errno=%d, pts=%p", ret, NET_ERROR, pTts); + errTts.push_back(pTts); + } + + if (dataList.size() > 0) + { + for (typeof(dataList.begin()) it = dataList.begin(); + it != dataList.end(); it++) + { + //printMsg("run", (*it)->c_str(), (*it)->size()); + kpr::ThreadPoolWorkPtr work = new ProcessDataWork(this, pTts, *it); + m_pNetThreadPool->AddWork(work); + } + } + } + } + + std::vector<TcpTransport*>::iterator itErr = errTts.begin(); + for (; itErr != errTts.end(); itErr++) + { + removeTTS(*itErr, true); + } + } + + handleTimerEvent(); + } + catch (...) + { + RMQ_ERROR("TcpRemotingClient.run catch exception"); + } + } + while (!m_stop); + handleTimerEvent(); + + RMQ_INFO("EventThread run end: %lld", KPRUtil::GetCurrentTimeMillis()); +} + + +void TcpRemotingClient::updateNameServerAddressList(const std::vector<std::string>& addrs) +{ + m_namesrvAddrList = addrs; + m_namesrvIndex = 0; +} + +std::vector<std::string> TcpRemotingClient::getNameServerAddressList() +{ + return m_namesrvAddrList; +} + +void TcpRemotingClient::registerProcessor(int requestCode, TcpRequestProcessor* pProcessor) +{ + m_processorTable[requestCode] = pProcessor; +} + + +RemotingCommand* TcpRemotingClient::invokeSync(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis) +{ + TcpTransport* pTts = getAndCreateTransport(addr, timeoutMillis); + if (pTts != NULL && pTts->isConnected()) + { + RemotingCommand* pResponse = NULL; + try + { + pResponse = invokeSyncImpl(pTts, pRequest, timeoutMillis); + } + catch(const RemotingSendRequestException& e) + { + RMQ_WARN("invokeSync: send pRequest exception, so close the channel[{%s}]", + pTts->getServerAddr().c_str()); + removeTTS(pTts, false); + throw e; + } + catch(const RemotingTimeoutException& e) + { + RMQ_WARN("invokeSync: wait response timeout exception, the channel[{%s}], timeout=%d", + pTts->getServerAddr().c_str(), timeoutMillis); + throw e; + } + + return pResponse; + } + else + { + removeTTS(pTts, false); + THROW_MQEXCEPTION(RemotingConnectException, "connect fail", -1); + //return NULL; + } +} + +void TcpRemotingClient::invokeAsync(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis, + InvokeCallback* pInvokeCallback) +{ + TcpTransport* pTts = getAndCreateTransport(addr, timeoutMillis); + if (pTts != NULL && pTts->isConnected()) + { + try + { + this->invokeAsyncImpl(pTts, pRequest, timeoutMillis, pInvokeCallback); + } + catch (const RemotingSendRequestException& e) + { + RMQ_WARN("invokeAsync: send pRequest exception, so close the channel[{%s}]", addr.c_str()); + removeTTS(pTts, false); + throw e; + } + + return; + } + else + { + removeTTS(pTts, false); + std::string msg;msg.append("connect to <").append(addr).append("> failed"); + THROW_MQEXCEPTION(RemotingConnectException, msg, -1); + } +} + +int TcpRemotingClient::invokeOneway(const std::string& addr, + RemotingCommand* pRequest, + int timeoutMillis) +{ + TcpTransport* pTts = getAndCreateTransport(addr, timeoutMillis); + if (pTts != NULL && pTts->isConnected()) + { + return invokeOnewayImpl(pTts, pRequest, timeoutMillis); + } + else + { + removeTTS(pTts, false); + return -1; + } +} + + +TcpTransport* TcpRemotingClient::getAndCreateTransport(const std::string& addr, int timeoutMillis) +{ + if (addr.empty()) + { + return getAndCreateNameserverTransport(timeoutMillis); + } + + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_transportTableLock); + std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(addr); + if (it != m_transportTable.end()) + { + return it->second; + } + } + + return this->createTransport(addr, timeoutMillis); +} + + +TcpTransport* TcpRemotingClient::createTransport(const std::string& addr, int timeoutMillis) +{ + TcpTransport* pTts = NULL; + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_transportTableLock); + std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(addr); + if (it != m_transportTable.end()) + { + return it->second; + } + } + + if (m_transportTableLock.TryWriteLock(s_LockTimeoutMillis)) + { + std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(addr); + if (it != m_transportTable.end()) + { + return it->second; + } + + std::map<std::string , std::string> config; + pTts = new TcpTransport(config); + if (pTts->connect(addr, timeoutMillis) != CLIENT_ERROR_SUCCESS) + { + delete pTts; + pTts = NULL; + + RMQ_INFO("[NETWORK]: CONNECT {%s} failed", addr.c_str()); + } + else + { + m_transportTable[addr] = pTts; + m_epoller.add(pTts->getSocket(), (long long)((pTts->getServerAddr()).c_str()), EPOLLIN); + + RMQ_INFO("[NETWORK]: CONNECT => {%s} success", addr.c_str()); + } + m_transportTableLock.Unlock(); + } + else + { + RMQ_WARN("createTransport: try to lock m_transportTable, but timeout, {%d}ms", timeoutMillis); + } + + return pTts; +} + + +TcpTransport* TcpRemotingClient::getAndCreateNameserverTransport(int timeoutMillis) +{ + TcpTransport* pTts = NULL; + + if (m_namesrvAddrChoosed.get() != NULL) + { + std::string addr = *m_namesrvAddrChoosed; + if (!addr.empty()) + { + pTts = getAndCreateTransport(addr, timeoutMillis); + if (pTts != NULL) + { + return pTts; + } + } + } + + if (m_namesrvAddrChoosedLock.TryLock(s_LockTimeoutMillis)) + { + if (m_namesrvAddrChoosed.get() != NULL) + { + std::string addr = *m_namesrvAddrChoosed; + if (!addr.empty()) + { + pTts = getAndCreateTransport(addr, timeoutMillis); + if (pTts != NULL) + { + m_namesrvAddrChoosedLock.Unlock(); + return pTts; + } + } + } + + if (!m_namesrvAddrList.empty()) + { + for (size_t i = 0; i < m_namesrvAddrList.size(); i++) + { + int index = abs(++m_namesrvIndex) % m_namesrvAddrList.size(); + std::string& newAddr = m_namesrvAddrList.at(index); + m_namesrvAddrChoosed.set(&newAddr); + TcpTransport* pTts = getAndCreateTransport(newAddr, timeoutMillis); + if (pTts != NULL) + { + m_namesrvAddrChoosedLock.Unlock(); + return pTts; + } + } + } + + m_namesrvAddrChoosedLock.Unlock(); + } + + return NULL; +} + + +void TcpRemotingClient::handleTimerEvent() +{ + // every 1000ms + static unsigned long long lastTime = 0; + if (!m_stop && (int)(KPRUtil::GetCurrentTimeMillis() - lastTime) < s_CheckIntervalMillis) + { + return; + } + + try + { + lastTime = KPRUtil::GetCurrentTimeMillis(); + + this->scanResponseTable(); + + this->scanCloseTransportTable(); + } + catch(...) + { + RMQ_ERROR("scanResponseTable exception"); + } +} + + +void TcpRemotingClient::scanCloseTransportTable() +{ + if (m_closeTransportTable.empty()) + { + return; + } + + if (m_closeTransportTableLock.TryLock()) + { + std::list<TcpTransport*>::iterator it; + for( it = m_closeTransportTable.begin(); it != m_closeTransportTable.end(); ) + { + TcpTransport* pTts = *it; + long long diffTime = KPRUtil::GetCurrentTimeMillis() - pTts->getLastSendRecvTime(); + if (m_stop || (diffTime > 5000)) + { + RMQ_WARN("remove close connection, %lld, {%s}", diffTime, pTts->getServerAddr().c_str()); + it = m_closeTransportTable.erase(it); + delete pTts; + } + else + { + it++; + } + } + m_closeTransportTableLock.Unlock(); + } + else + { + RMQ_WARN("m_closeTransportTableLock TryLock fail"); + } +} + + +void TcpRemotingClient::scanResponseTable() +{ + kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock); + for(typeof(m_responseTable.begin()) it = m_responseTable.begin();it != m_responseTable.end();) + { + long long diffTime = KPRUtil::GetCurrentTimeMillis() - it->second->getBeginTimestamp(); + if (m_stop || (diffTime > it->second->getTimeoutMillis() + 2000)) + { + RMQ_WARN("remove timeout request, %lld, %s", diffTime, it->second->toString().c_str()); + try + { + it->second->executeInvokeCallback(); + } + catch(...) + { + RMQ_WARN("scanResponseTable, operationComplete Exception"); + } + it->second->release(); + m_responseTable.erase(it++); + } + else + { + it++; + } + } +} + +void TcpRemotingClient::processData(TcpTransport* pTts, std::string* pData) +{ + //printMsg("processData", pData->c_str(), pData->size()); + RemotingCommand* pCmd = RemotingCommand::decode(pData->data(), (int)pData->size()); + if (pCmd == NULL) + { + RMQ_ERROR("invalid data format, len:%d, data: %s", (int)pData->size(), pData->c_str()); + return; + } + + int code = 0; + if (pCmd->isResponseType()) + { + kpr::ScopedRLock<kpr::RWMutex> lock(m_responseTableLock); + std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pCmd->getOpaque()); + if (it != m_responseTable.end()) + { + code = it->second->getRequestCode(); + } + else + { + RMQ_WARN("receive response, but not matched any request, maybe timeout or oneway, pCmd: %s", pCmd->toString().c_str()); + delete pCmd; + return; + } + } + else + { + code = pCmd->getCode(); + } + + pCmd->makeCustomHeader(code, pData->data(), (int)pData->size()); + if (pCmd->isResponseType()) + { + RMQ_DEBUG("[NETWORK]: RECV => {%s}, {opaque=%d, requst.code=%s(%d), response.code=%s(%d)}, %s", + pTts->getServerAddr().c_str(), pCmd->getOpaque(), getMQRequestCodeString(code), code, + getMQResponseCodeString(pCmd->getCode()), pCmd->getCode(), pCmd->toString().c_str()); + } + else + { + RMQ_DEBUG("[NETWORK]: RECV => {%s}, {opaque=%d, requst.code=%s(%d)}, %s", + pTts->getServerAddr().c_str(), pCmd->getOpaque(), + getMQRequestCodeString(code), code, pCmd->toString().c_str()); + } + + processMessageReceived(pTts, pCmd); +} + +RemotingCommand* TcpRemotingClient::invokeSyncImpl(TcpTransport* pTts, + RemotingCommand* pRequest, + int timeoutMillis) +{ + ResponseFuturePtr pResponseFuture = new ResponseFuture( + pRequest->getCode(), pRequest->getOpaque(), timeoutMillis, + NULL, true, NULL); + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock); + m_responseTable.insert(std::pair<int, ResponseFuturePtr>(pRequest->getOpaque(), pResponseFuture)); + } + + int ret = sendCmd(pTts, pRequest, timeoutMillis); + if (ret == 0) + { + pResponseFuture->setSendRequestOK(true); + } + else + { + pResponseFuture->setSendRequestOK(false); + pResponseFuture->putResponse(NULL); + RMQ_WARN("send a pRequest command to channel <%s> failed.", pTts->getServerAddr().c_str()); + } + + RemotingCommand* pResponse = pResponseFuture->waitResponse(timeoutMillis); + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock); + std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pRequest->getOpaque()); + if (it != m_responseTable.end()) + { + m_responseTable.erase(it); + } + } + + if (pResponse == NULL) + { + if (ret == 0) + { + std::stringstream oss; + oss << "wait response on the channel <" << pTts->getServerAddr() << "> timeout," << timeoutMillis << "ms"; + THROW_MQEXCEPTION(RemotingTimeoutException, oss.str(), -1); + } + else + { + std::stringstream oss; + oss << "send request to <" << pTts->getServerAddr() << "> failed"; + THROW_MQEXCEPTION(RemotingSendRequestException, oss.str(), -1); + } + } + + return pResponse; +} + +void TcpRemotingClient::invokeAsyncImpl(TcpTransport* pTts, + RemotingCommand* pRequest, + int timeoutMillis, + InvokeCallback* pInvokeCallback) +{ + bool acquired = m_semaphoreAsync.Wait(timeoutMillis); + if (acquired) + { + ResponseFuturePtr pResponseFuture = new ResponseFuture( + pRequest->getCode(), pRequest->getOpaque(), timeoutMillis, + pInvokeCallback, false, &m_semaphoreAsync); + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock); + m_responseTable.insert(std::pair<int, ResponseFuturePtr>(pRequest->getOpaque(), pResponseFuture)); + } + + int ret = sendCmd(pTts, pRequest, timeoutMillis); + if (ret == 0) + { + pResponseFuture->setSendRequestOK(true); + } + else + { + pResponseFuture->setSendRequestOK(false); + pResponseFuture->putResponse(NULL); + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock); + std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pRequest->getOpaque()); + if (it != m_responseTable.end()) + { + m_responseTable.erase(it); + } + } + + try + { + pResponseFuture->executeInvokeCallback(); + } + catch (...) + { + RMQ_WARN("executeInvokeCallback exception"); + } + pResponseFuture->release(); + + RMQ_WARN("send a pRequest command to channel <%s> failed, requet: %s", + pTts->getServerAddr().c_str(), pRequest->toString().c_str()); + } + } + else + { + if (timeoutMillis <= 0) + { + THROW_MQEXCEPTION(RemotingTooMuchRequestException, "invokeAsyncImpl invoke too fast", -1); + } + else + { + std::string info = RocketMQUtil::str2fmt( + "invokeAsyncImpl wait semaphore timeout, %dms, semaphoreAsyncValue: %d, request: %s", + timeoutMillis, + m_semaphoreAsync.GetValue(), + pRequest->toString().c_str() + ); + RMQ_WARN("%s", info.c_str()); + THROW_MQEXCEPTION(RemotingTimeoutException, info, -1); + } + } + + return; +} + +int TcpRemotingClient::invokeOnewayImpl(TcpTransport* pTts, + RemotingCommand* pRequest, + int timeoutMillis) +{ + pRequest->markOnewayRPC(); + + bool acquired = m_semaphoreOneway.Wait(timeoutMillis); + if (acquired) + { + int ret = sendCmd(pTts, pRequest, timeoutMillis); + m_semaphoreOneway.Release(); + if (ret != 0) + { + RMQ_WARN("send a pRequest command to channel <%s> failed, requet: %s", + pTts->getServerAddr().c_str(), pRequest->toString().c_str()); + THROW_MQEXCEPTION(RemotingSendRequestException, std::string("send request to <") + pTts->getServerAddr() + "> fail", -1); + } + } + else + { + if (timeoutMillis <= 0) + { + THROW_MQEXCEPTION(RemotingTooMuchRequestException, "invokeOnewayImpl invoke too fast", -1); + } + else + { + std::string info = RocketMQUtil::str2fmt( + "invokeOnewayImpl wait semaphore timeout, %dms, semaphoreAsyncValue: %d, request: %s", + timeoutMillis, + m_semaphoreAsync.GetValue(), + pRequest->toString().c_str() + ); + RMQ_WARN("%s", info.c_str()); + THROW_MQEXCEPTION(RemotingTimeoutException, info, -1); + } + } + + return 0; +} + +void TcpRemotingClient::processMessageReceived(TcpTransport* pTts, RemotingCommand* pCmd) +{ + try + { + switch (pCmd->getType()) + { + case REQUEST_COMMAND: + processRequestCommand(pTts, pCmd); + break; + case RESPONSE_COMMAND: + processResponseCommand(pTts, pCmd); + break; + default: + break; + } + } + catch (std::exception& e) + { + RMQ_ERROR("processMessageReceived catch Exception: %s", e.what()); + } + catch (...) + { + RMQ_ERROR("processMessageReceived catch Exception"); + } +} + +void TcpRemotingClient::processRequestCommand(TcpTransport* pTts, RemotingCommand* pCmd) +{ + RMQ_DEBUG("receive request from server, cmd: %s", pCmd->toString().c_str()); + RemotingCommandPtr pResponse = NULL; + std::map<int, TcpRequestProcessor*>::iterator it = m_processorTable.find(pCmd->getCode()); + if (it != m_processorTable.end()) + { + try + { + pResponse = it->second->processRequest(pTts, pCmd); + if (!pCmd->isOnewayRPC()) + { + if (pResponse.ptr() != NULL) + { + pResponse->setOpaque(pCmd->getOpaque()); + pResponse->markResponseType(); + int ret = this->sendCmd(pTts, pResponse, 3000); + if (ret != 0) + { + RMQ_ERROR("process request over, but response failed"); + } + } + else + { + // ignore + } + } + } + catch (const std::exception& e) + { + RMQ_ERROR("process request exception:%s", e.what()); + if (!pCmd->isOnewayRPC()) + { + pResponse = RemotingCommand::createResponseCommand( + SYSTEM_ERROR_VALUE, e.what(), NULL); + pResponse->setOpaque(pCmd->getOpaque()); + int ret = this->sendCmd(pTts, pResponse, 3000); + if (ret != 0) + { + RMQ_ERROR("process request over, but response failed"); + } + } + } + } + else + { + pResponse = RemotingCommand::createResponseCommand( + REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL); + pResponse->setOpaque(pCmd->getOpaque()); + int ret = this->sendCmd(pTts, pResponse, 3000); + if (ret != 0) + { + RMQ_ERROR("process request over, but pResponse failed"); + } + } + delete pCmd; +} + +void TcpRemotingClient::processResponseCommand(TcpTransport* pTts, RemotingCommand* pCmd) +{ + ResponseFuturePtr res = NULL; + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_responseTableLock); + std::map<int, ResponseFuturePtr>::iterator it = m_responseTable.find(pCmd->getOpaque()); + if (it != m_responseTable.end()) + { + res = it->second; + res->release(); + m_responseTable.erase(it); + } + } + + if (res) + { + res->putResponse(pCmd); + res->executeInvokeCallback(); + } + else + { + RMQ_WARN("receive response, but not matched any request, cmd: %s", pCmd->toString().c_str()); + delete pCmd; + } +} + +int TcpRemotingClient::sendCmd(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis) +{ + pRequest->encode(); + int ret = pTts->sendData(pRequest->getData(), pRequest->getDataLen(), timeoutMillis); + + RMQ_DEBUG("[NETWORK]: SEND => {%s}, {opaque=%d, request.code=%s(%d), ret=%d, timeout=%d}, %s", + pTts->getServerAddr().c_str(), pRequest->getOpaque(), + getMQRequestCodeString(pRequest->getCode()), pRequest->getCode(), + ret, timeoutMillis, pRequest->toString().c_str()); + + return ret; +} + +void TcpRemotingClient::removeTTS(TcpTransport* pTts, bool isDisConnected) +{ + if (pTts) + { + RMQ_INFO("[NETWORK]: %s => {%s}", isDisConnected ? "DISCONNECT" : "CLOSE", + pTts->getServerAddr().c_str()); + + bool bNeedClear = false; + m_epoller.del(pTts->getSocket(), (long long)(pTts->getServerAddr().c_str()), 0); + { + kpr::ScopedWLock<kpr::RWMutex> lock(m_transportTableLock); + std::map<std::string , TcpTransport*>::iterator it = m_transportTable.find(pTts->getServerAddr()); + if (it != m_transportTable.end()) + { + if (it->second == pTts) + { + m_transportTable.erase(it); + bNeedClear = true; + } + } + } + + if (bNeedClear) + { + kpr::ScopedLock<kpr::Mutex> lock(m_closeTransportTableLock); + m_closeTransportTable.push_back(pTts); + } + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpRemotingClient.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/TcpRemotingClient.h b/rocketmq-client4cpp/src/transport/TcpRemotingClient.h new file mode 100755 index 0000000..d8bbf96 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/TcpRemotingClient.h @@ -0,0 +1,152 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __TCPREMOTINGCLIENT_H__ +#define __TCPREMOTINGCLIENT_H__ + +#include <map> +#include <string> +#include <list> + +#include "RocketMQClient.h" +#include "SocketUtil.h" +#include "Epoller.h" +#include "RemotingCommand.h" +#include "Thread.h" +#include "ThreadPool.h" +#include "ThreadPoolWork.h" +#include "RemoteClientConfig.h" +#include "TcpTransport.h" +#include "ScopedLock.h" +#include "KPRUtil.h" +#include "Semaphore.h" +#include "ResponseFuture.h" + +namespace rmq +{ + class TcpTransport; + class InvokeCallback; + class TcpRemotingClient; + class ResponseFuture; + class TcpRequestProcessor; + + class ProcessDataWork : public kpr::ThreadPoolWork + { + public: + ProcessDataWork(TcpRemotingClient* pClient, TcpTransport* pTts, std::string* pData); + virtual ~ProcessDataWork(); + virtual void Do(); + + private: + TcpRemotingClient* m_pClient; + TcpTransport* m_pTts; + std::string* m_pData; + }; + typedef kpr::RefHandleT<ProcessDataWork> ProcessDataWorkPtr; + + class TcpRemotingClient + { + class EventThread : public kpr::Thread + { + public: + EventThread(TcpRemotingClient& client) + : Thread("NetThread"), m_client(client) + { + } + + void Run() + { + m_client.run(); + } + + private : + TcpRemotingClient& m_client; + }; + friend class EventThread; + friend class ProcessDataWork; + + public: + static const int s_LockTimeoutMillis = 3000; + static const int s_CheckIntervalMillis = 1000; + static const int s_ClientOnewaySemaphoreValue = 2048; + static const int s_ClientAsyncSemaphoreValue = 2048; + + public: + TcpRemotingClient(const RemoteClientConfig& config); + virtual ~TcpRemotingClient(); + virtual void start(); + virtual void shutdown(); + + void updateNameServerAddressList(const std::vector<std::string>& addrs); + std::vector<std::string> getNameServerAddressList(); + void registerProcessor(int requestCode, TcpRequestProcessor* pProcessor); + + RemotingCommand* invokeSync(const std::string& addr, RemotingCommand* pRequest, int timeoutMillis) ; + void invokeAsync(const std::string& addr, RemotingCommand* pRequest, int timeoutMillis, InvokeCallback* invokeCallback); + int invokeOneway(const std::string& addr, RemotingCommand* pRequest, int timeoutMillis); + + private: + void run(); + int sendCmd(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis); + void removeTTS(TcpTransport* pTts, bool isDisConnected = false); + void processData(TcpTransport* pTts, std::string* data); + void handleTimerEvent(); + void scanResponseTable(); + void scanCloseTransportTable(); + + void processMessageReceived(TcpTransport* pTts, RemotingCommand* pCmd); + void processRequestCommand(TcpTransport* pTts, RemotingCommand* pCmd); + void processResponseCommand(TcpTransport* pTts, RemotingCommand* pCmd); + + TcpTransport* getAndCreateTransport(const std::string& addr, int timeoutMillis); + TcpTransport* getAndCreateNameserverTransport(int timeoutMillis); + TcpTransport* createTransport(const std::string& addr, int timeoutMillis); + + RemotingCommand* invokeSyncImpl(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis) ; + void invokeAsyncImpl(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis, InvokeCallback* pInvokeCallback); + int invokeOnewayImpl(TcpTransport* pTts, RemotingCommand* pRequest, int timeoutMillis); + + private: + bool m_stop; + kpr::Epoller m_epoller; + RemoteClientConfig m_config; + + kpr::Semaphore m_semaphoreOneway; + kpr::Semaphore m_semaphoreAsync; + + std::map<std::string , TcpTransport*> m_transportTable; + kpr::RWMutex m_transportTableLock; + + std::list<TcpTransport*> m_closeTransportTable; + kpr::Mutex m_closeTransportTableLock; + + std::map<int, ResponseFuturePtr> m_responseTable; + kpr::RWMutex m_responseTableLock; + + std::vector<std::string> m_namesrvAddrList; + kpr::AtomicInteger m_namesrvIndex; + kpr::AtomicReference<std::string> m_namesrvAddrChoosed; + kpr::Mutex m_namesrvAddrChoosedLock; + + kpr::ThreadPoolPtr m_pNetThreadPool; + kpr::ThreadPtr m_pEventThread; + + TcpRequestProcessor* m_pDefaultRequestProcessor; + std::map<int, TcpRequestProcessor*> m_processorTable; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h b/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h new file mode 100755 index 0000000..6ac02d1 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/TcpRequestProcessor.h @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TCPREQUESTPROCESSOR_H__ +#define __TCPREQUESTPROCESSOR_H__ + +namespace rmq +{ + class RemotingCommand; + class TcpTransport; + + class TcpRequestProcessor + { + public: + virtual ~TcpRequestProcessor() {} + virtual RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/transport/TcpTransport.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/transport/TcpTransport.cpp b/rocketmq-client4cpp/src/transport/TcpTransport.cpp new file mode 100755 index 0000000..858adf3 --- /dev/null +++ b/rocketmq-client4cpp/src/transport/TcpTransport.cpp @@ -0,0 +1,387 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "TcpTransport.h" + +#include <stdio.h> +#include <stdlib.h> +#include <memory.h> +#include <errno.h> +#include <assert.h> +#include "KPRUtil.h" +#include "SocketUtil.h" +#include "Epoller.h" +#include "ScopedLock.h" + +namespace rmq +{ + +const int DEFAULT_SHRINK_COUNT = 32; +const int DEFAULT_RECV_BUFFER_SIZE = 1024 * 16; + +TcpTransport::TcpTransport(std::map<std::string, std::string>& config) + : m_sfd(-1), + m_state(CLIENT_STATE_UNINIT), + m_pRecvBuf(NULL), + m_recvBufSize(DEFAULT_RECV_BUFFER_SIZE), + m_recvBufUsed(0), + m_shrinkMax(DEFAULT_RECV_BUFFER_SIZE), + m_shrinkCheckCnt(DEFAULT_SHRINK_COUNT) +{ + std::map<std::string, std::string>::iterator it = config.find("tcp.transport.recvBufferSize"); + if (it != config.end()) + { + m_recvBufSize = atoi(it->second.c_str()); + } + + it = config.find("tcp.transport.shrinkCheckMax"); + if (it != config.end()) + { + m_shrinkCheckCnt = atoi(it->second.c_str()); + } + + if (SocketInit() != 0) + { + m_state = CLIENT_STATE_UNINIT; + } + + m_pRecvBuf = (char*)malloc(m_recvBufSize); + m_state = (NULL == m_pRecvBuf) ? CLIENT_STATE_UNINIT : CLIENT_STATE_INITED; + m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis(); +} + +TcpTransport::~TcpTransport() +{ + close(); + + if (m_sfd != INVALID_SOCKET) + { + ::shutdown(m_sfd, SD_BOTH); + ::closesocket(m_sfd); + m_sfd = INVALID_SOCKET; + } + + if (m_pRecvBuf) + { + free(m_pRecvBuf); + } + + SocketUninit(); +} + + +int TcpTransport::connect(const std::string& serverAddr, int timeoutMillis) +{ + long long endTime = KPRUtil::GetCurrentTimeMillis() + timeoutMillis; + if (m_state == CLIENT_STATE_UNINIT) + { + return CLIENT_ERROR_INIT; + } + + if (isConnected()) + { + if (serverAddr.compare(m_serverAddr) == 0) + { + return CLIENT_ERROR_SUCCESS; + } + else + { + close(); + } + } + + short port; + std::string strAddr; + + if (!SplitURL(serverAddr, strAddr, port)) + { + return CLIENT_ERROR_INVALID_URL; + } + + struct sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + + sa.sin_addr.s_addr = inet_addr(strAddr.c_str()); + m_sfd = (int)socket(AF_INET, SOCK_STREAM, 0); + + if (MakeSocketNonblocking(m_sfd) == -1) + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + + if (SetTcpNoDelay(m_sfd) == -1) + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + + if (::connect(m_sfd, (struct sockaddr*)&sa, sizeof(sockaddr)) == -1) + { + int err = NET_ERROR; + if (err == WSAEWOULDBLOCK || err == WSAEINPROGRESS) + { + kpr::Epoller epoller(false); + epoller.create(1); + epoller.add(m_sfd, 0, EPOLLOUT); + int iRetCode = epoller.wait(endTime - KPRUtil::GetCurrentTimeMillis()); + if (iRetCode <= 0) + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + else if (iRetCode == 0) + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + + const epoll_event& ev = epoller.get(0); + if (ev.events & EPOLLERR || ev.events & EPOLLHUP) + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + + int opterr = 0; + socklen_t errlen = sizeof(opterr); + if (getsockopt(m_sfd, SOL_SOCKET, SO_ERROR, &opterr, &errlen) == -1 || opterr) + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + } + else + { + ::closesocket(m_sfd); + return CLIENT_ERROR_CONNECT; + } + } + + m_serverAddr = serverAddr; + m_state = CLIENT_STATE_CONNECTED; + m_recvBufUsed = 0; + m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis(); + + return CLIENT_ERROR_SUCCESS; +} + + +bool TcpTransport::isConnected() +{ + return m_state == CLIENT_STATE_CONNECTED; +} + +void TcpTransport::close() +{ + if (m_state == CLIENT_STATE_CONNECTED) + { + m_state = CLIENT_STATE_DISCONNECT; + } +} + +int TcpTransport::sendData(const char* pBuffer, int len, int timeOut) +{ + kpr::ScopedLock<kpr::Mutex> lock(m_sendLock); + return sendOneMsg(pBuffer, len, timeOut > 0 ? timeOut : 0); +} + +int TcpTransport::sendOneMsg(const char* pBuffer, int len, int nTimeOut) +{ + int pos = 0; + long long endTime = KPRUtil::GetCurrentTimeMillis() + nTimeOut; + + while (len > 0 && m_state == CLIENT_STATE_CONNECTED) + { + int ret = send(m_sfd, pBuffer + pos, len, 0); + if (ret > 0) + { + len -= ret; + pos += ret; + } + else if (ret == 0) + { + close(); + break; + } + else + { + int err = NET_ERROR; + if (err == WSAEWOULDBLOCK || err == EAGAIN) + { + kpr::Epoller epoller(false); + epoller.create(1); + epoller.add(m_sfd, 0, EPOLLOUT); + int iRetCode = epoller.wait(endTime - KPRUtil::GetCurrentTimeMillis()); + if (iRetCode <= 0) + { + close(); + break; + } + else if (iRetCode == 0) + { + close(); + break; + } + + const epoll_event& ev = epoller.get(0); + if (ev.events & EPOLLERR || ev.events & EPOLLHUP) + { + close(); + break; + } + } + else + { + close(); + break; + } + } + } + m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis(); + + return (len == 0) ? 0 : -1; +} + + +int TcpTransport::recvMsg() +{ + int ret = recv(m_sfd, m_pRecvBuf + m_recvBufUsed, m_recvBufSize - m_recvBufUsed, 0); + + if (ret > 0) + { + m_recvBufUsed += ret; + } + else if (ret == 0) + { + close(); + ret = -1; + } + else if (ret < 0) + { + int err = NET_ERROR; + if (err == WSAEWOULDBLOCK || err == EAGAIN || err == EINTR) + { + ret = 0; + } + else + { + close(); + } + } + m_lastSendRecvTime = KPRUtil::GetCurrentTimeMillis(); + + return ret; +} + +bool TcpTransport::resizeBuf(int nNewSize) +{ + char* newbuf = (char*)realloc(m_pRecvBuf, nNewSize); + if (!newbuf) + { + return false; + } + + m_pRecvBuf = newbuf; + m_recvBufSize = nNewSize; + + return true; +} + +void TcpTransport::tryShrink(int MsgLen) +{ + m_shrinkMax = MsgLen > m_shrinkMax ? MsgLen : m_shrinkMax; + if (m_shrinkCheckCnt == 0) + { + m_shrinkCheckCnt = DEFAULT_SHRINK_COUNT; + if (m_recvBufSize > m_shrinkMax) + { + resizeBuf(m_shrinkMax); + } + } + else + { + m_shrinkCheckCnt--; + } +} + +int TcpTransport::getMsgSize(const char* pBuf) +{ + int len = 0; + memcpy(&len, pBuf, sizeof(int)); + + return ntohl(len) + 4; +} + +int TcpTransport::recvData(std::list<std::string*>& dataList) +{ + int ret = recvMsg(); + processData(dataList); + return ret; +} + +void TcpTransport::processData(std::list<std::string*>& dataList) +{ + while (m_recvBufUsed > int(sizeof(int))) + { + int msgLen = 0; + msgLen = getMsgSize(m_pRecvBuf); + if (msgLen > m_recvBufSize) + { + if (resizeBuf(msgLen)) + { + m_shrinkCheckCnt = DEFAULT_SHRINK_COUNT; + } + break; + } + else + { + tryShrink(msgLen); + } + + if (m_recvBufUsed >= msgLen) + { + std::string* data = new std::string; + data->assign(m_pRecvBuf, msgLen); + dataList.push_back(data); + m_recvBufUsed -= msgLen; + + memmove(m_pRecvBuf, m_pRecvBuf + msgLen, m_recvBufUsed); + } + else + { + break; + } + } +} + +SOCKET TcpTransport::getSocket() +{ + return m_sfd; +} + +std::string& TcpTransport::getServerAddr() +{ + return m_serverAddr; +} + +unsigned long long TcpTransport::getLastSendRecvTime() +{ + return m_lastSendRecvTime; +} + + +}
