This is an automated email from the ASF dual-hosted git repository. ifplusor pushed a commit to branch re_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit 40882dc35e8f6bc62edd2fb03864ef031e72c8b3 Author: James Yin <[email protected]> AuthorDate: Thu Mar 11 15:59:04 2021 +0800 feat: support ipv6 --- src/ClientRemotingProcessor.cpp | 6 +- src/MQClientConfigImpl.hpp | 3 +- src/common/UtilAll.cpp | 53 +------ src/common/UtilAll.h | 8 -- src/consumer/DefaultMQPushConsumerImpl.cpp | 4 +- src/message/MessageBatch.cpp | 3 +- src/message/MessageClientIDSetter.cpp | 40 ++++-- src/message/MessageClientIDSetter.h | 2 +- src/message/MessageDecoder.cpp | 42 +++--- src/message/MessageDecoder.h | 7 +- src/message/MessageExtImpl.cpp | 26 ++-- src/message/MessageExtImpl.h | 4 +- src/message/MessageId.h | 16 +-- src/transport/EventLoop.cpp | 6 +- src/transport/SocketUtil.cpp | 217 ++++++++++++++++++----------- src/transport/SocketUtil.h | 45 ++++-- 16 files changed, 258 insertions(+), 224 deletions(-) diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp index cf97306..32366d6 100644 --- a/src/ClientRemotingProcessor.cpp +++ b/src/ClientRemotingProcessor.cpp @@ -16,9 +16,9 @@ */ #include "ClientRemotingProcessor.h" -#include "MessageDecoder.h" #include "MQProtos.h" #include "MessageAccessor.hpp" +#include "MessageDecoder.h" #include "MessageSysFlag.h" #include "RequestFutureTable.h" #include "SocketUtil.h" @@ -153,11 +153,11 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r msg->set_store_timestamp(requestHeader->store_timestamp()); if (!requestHeader->born_host().empty()) { - msg->set_born_host(string2SocketAddress(requestHeader->born_host())); + msg->set_born_host(StringToSockaddr(requestHeader->born_host())); } if (!requestHeader->store_host().empty()) { - msg->set_store_host(string2SocketAddress(requestHeader->store_host())); + msg->set_store_host(StringToSockaddr(requestHeader->store_host())); } auto body = request->body(); diff --git a/src/MQClientConfigImpl.hpp b/src/MQClientConfigImpl.hpp index 59b4601..b9ad503 100644 --- a/src/MQClientConfigImpl.hpp +++ b/src/MQClientConfigImpl.hpp @@ -22,6 +22,7 @@ #include "MQClientConfig.h" #include "NamespaceUtil.h" +#include "SocketUtil.h" #include "UtilAll.h" namespace rocketmq { @@ -45,7 +46,7 @@ class MQClientConfigImpl : virtual public MQClientConfig { std::string buildMQClientId() const override { std::string clientId; - clientId.append(UtilAll::getLocalAddress()); // clientIP + clientId.append(GetLocalAddress()); // clientIP clientId.append("@"); clientId.append(instance_name_); // instanceName if (!unit_name_.empty()) { diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp index 2cbe00a..731d8aa 100644 --- a/src/common/UtilAll.cpp +++ b/src/common/UtilAll.cpp @@ -40,9 +40,6 @@ namespace rocketmq { -std::string UtilAll::sLocalHostName; -std::string UtilAll::sLocalIpAddress; - bool UtilAll::try_lock_for(std::timed_mutex& mutex, long timeout) { auto now = std::chrono::steady_clock::now(); auto deadline = now + std::chrono::milliseconds(timeout); @@ -157,7 +154,7 @@ bool UtilAll::isBlank(const std::string& str) { } bool UtilAll::SplitURL(const std::string& serverURL, std::string& addr, short& nPort) { - auto pos = serverURL.find(':'); + auto pos = serverURL.find_last_of(':'); if (pos == std::string::npos) { return false; } @@ -228,54 +225,6 @@ int UtilAll::Split(std::vector<std::string>& ret_, const std::string& strIn, con return ret_.size(); } -std::string UtilAll::getLocalHostName() { - if (sLocalHostName.empty()) { - char name[1024]; - if (::gethostname(name, sizeof(name)) != 0) { - return null; - } - sLocalHostName.append(name, strlen(name)); - } - return sLocalHostName; -} - -std::string UtilAll::getLocalAddress() { - if (sLocalIpAddress.empty()) { - auto hostname = getLocalHostName(); - if (!hostname.empty()) { - try { - sLocalIpAddress = socketAddress2String(lookupNameServers(hostname)); - } catch (std::exception& e) { - LOG_WARN(e.what()); - sLocalIpAddress = "127.0.0.1"; - } - } - } - return sLocalIpAddress; -} - -uint32_t UtilAll::getIP() { - std::string ip = UtilAll::getLocalAddress(); - if (ip.empty()) { - return 0; - } - - char* ip_str = new char[ip.length() + 1]; - std::strncpy(ip_str, ip.c_str(), ip.length()); - ip_str[ip.length()] = '\0'; - - int i = 3; - uint32_t nResult = 0; - for (char* token = std::strtok(ip_str, "."); token != nullptr && i >= 0; token = std::strtok(nullptr, ".")) { - uint32_t n = std::atoi(token); - nResult |= n << (8 * i--); - } - - delete[] ip_str; - - return nResult; -} - std::string UtilAll::getHomeDirectory() { #ifndef WIN32 char* home_env = std::getenv("HOME"); diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h index 8ad49b0..63989f7 100644 --- a/src/common/UtilAll.h +++ b/src/common/UtilAll.h @@ -112,10 +112,6 @@ class UtilAll { static int Split(std::vector<std::string>& ret_, const std::string& strIn, const char sep); static int Split(std::vector<std::string>& ret_, const std::string& strIn, const std::string& sep); - static std::string getLocalHostName(); - static std::string getLocalAddress(); - static uint32_t getIP(); - static std::string getHomeDirectory(); static void createDirectory(std::string const& dir); static bool existDirectory(std::string const& dir); @@ -138,10 +134,6 @@ class UtilAll { // Returns true on success. // Returns false on failure.. static bool ReplaceFile(const std::string& from_path, const std::string& to_path); - - private: - static std::string sLocalHostName; - static std::string sLocalIpAddress; }; template <typename T> diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp index 11a12a1..bf00cc2 100644 --- a/src/consumer/DefaultMQPushConsumerImpl.cpp +++ b/src/consumer/DefaultMQPushConsumerImpl.cpp @@ -536,8 +536,8 @@ bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delay_lev try { msg->set_topic(NamespaceUtil::wrapNamespace(client_config_->name_space(), msg->topic())); - std::string brokerAddr = brokerName.empty() ? socketAddress2String(msg->store_host()) - : client_instance_->findBrokerAddressInPublish(brokerName); + std::string brokerAddr = + brokerName.empty() ? msg->store_host_string() : client_instance_->findBrokerAddressInPublish(brokerName); client_instance_->getMQClientAPIImpl()->consumerSendMessageBack( brokerAddr, msg, getDefaultMQPushConsumerConfig()->group_name(), delay_level, 5000, diff --git a/src/message/MessageBatch.cpp b/src/message/MessageBatch.cpp index 4030fd6..2ba0c18 100644 --- a/src/message/MessageBatch.cpp +++ b/src/message/MessageBatch.cpp @@ -16,8 +16,9 @@ */ #include "MessageBatch.h" -#include "MessageDecoder.h" +#include "MQException.h" #include "MessageClientIDSetter.h" +#include "MessageDecoder.h" namespace rocketmq { diff --git a/src/message/MessageClientIDSetter.cpp b/src/message/MessageClientIDSetter.cpp index dda45df..5560ba1 100644 --- a/src/message/MessageClientIDSetter.cpp +++ b/src/message/MessageClientIDSetter.cpp @@ -25,7 +25,8 @@ #include <unistd.h> #endif -#include "ByteOrder.h" +#include "ByteBuffer.hpp" +#include "SocketUtil.h" #include "UtilAll.h" namespace rocketmq { @@ -33,16 +34,28 @@ namespace rocketmq { MessageClientIDSetter::MessageClientIDSetter() { std::srand((uint32_t)std::time(NULL)); - uint32_t pid = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(UtilAll::getProcessId())); - uint32_t ip = ByteOrderUtil::NorminalBigEndian(UtilAll::getIP()); - uint32_t random_num = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(std::rand())); - - char bin_buf[10]; - std::memcpy(bin_buf + 2, &pid, 4); - std::memcpy(bin_buf, &ip, 4); - std::memcpy(bin_buf + 6, &random_num, 4); + std::unique_ptr<ByteBuffer> buffer; + sockaddr* addr = GetSelfIP(); + if (addr != nullptr) { + buffer.reset(ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4)); + if (addr->sa_family == AF_INET) { + auto* sin = (struct sockaddr_in*)addr; + buffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize)); + } else if (addr->sa_family == AF_INET6) { + auto* sin6 = (struct sockaddr_in6*)addr; + buffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize)); + } else { + (void)buffer.release(); + } + } + if (buffer == nullptr) { + buffer.reset(ByteBuffer::allocate(4 + 2 + 4)); + buffer->putInt(UtilAll::currentTimeMillis()); + } + buffer->putShort(UtilAll::getProcessId()); + buffer->putInt(std::rand()); - fix_string_ = UtilAll::bytes2string(bin_buf, 10); + fixed_string_ = UtilAll::bytes2string(buffer->array(), buffer->position()); setStartTime(UtilAll::currentTimeMillis()); @@ -93,11 +106,8 @@ std::string MessageClientIDSetter::createUniqueID() { uint32_t period = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(current - start_time_)); uint16_t seqid = ByteOrderUtil::NorminalBigEndian(counter_++); - char bin_buf[6]; - std::memcpy(bin_buf, &period, 4); - std::memcpy(bin_buf + 4, &seqid, 2); - - return fix_string_ + UtilAll::bytes2string(bin_buf, 6); + return fixed_string_ + UtilAll::bytes2string(reinterpret_cast<char*>(&period), sizeof(period)) + + UtilAll::bytes2string(reinterpret_cast<char*>(&seqid), sizeof(seqid)); } } // namespace rocketmq diff --git a/src/message/MessageClientIDSetter.h b/src/message/MessageClientIDSetter.h index 222115e..e688786 100644 --- a/src/message/MessageClientIDSetter.h +++ b/src/message/MessageClientIDSetter.h @@ -68,7 +68,7 @@ class MessageClientIDSetter { uint64_t next_start_time_; std::atomic<uint16_t> counter_; - std::string fix_string_; + std::string fixed_string_; }; } // namespace rocketmq diff --git a/src/message/MessageDecoder.cpp b/src/message/MessageDecoder.cpp index a36d3bf..08df407 100644 --- a/src/message/MessageDecoder.cpp +++ b/src/message/MessageDecoder.cpp @@ -20,14 +20,18 @@ #include <sstream> // std::stringstream #ifndef WIN32 -#include <netinet/in.h> // struct sockaddr, sockaddr_in, sockaddr_in6 +#include <arpa/inet.h> // htons +#include <netinet/in.h> // sockaddr_in, sockaddr_in6 +#else +#include "Winsock2.h" #endif #include "ByteOrder.h" #include "Logging.h" -#include "MessageExtImpl.h" #include "MessageAccessor.hpp" +#include "MessageExtImpl.h" #include "MessageSysFlag.h" +#include "SocketUtil.h" #include "UtilAll.h" static const char NAME_VALUE_SEPARATOR = 1; @@ -36,37 +40,35 @@ static const char PROPERTY_SEPARATOR = 2; namespace rocketmq { std::string MessageDecoder::createMessageId(const struct sockaddr* sa, int64_t offset) { - int msgIDLength = sa->sa_family == AF_INET ? 16 : 28; - std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIDLength)); + int msgIdLength = IpaddrSize(sa) + /* port field size */ 4 + sizeof(offset); + std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIdLength)); if (sa->sa_family == AF_INET) { struct sockaddr_in* sin = (struct sockaddr_in*)sa; - byteBuffer->put(ByteArray((char*)&sin->sin_addr, 4)); - byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin->sin_port)); + byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize)); + byteBuffer->putInt(ntohs(sin->sin_port)); } else { struct sockaddr_in6* sin6 = (struct sockaddr_in6*)sa; - byteBuffer->put(ByteArray((char*)&sin6->sin6_addr, 16)); - byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin6->sin6_port)); + byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize)); + byteBuffer->putInt(ntohs(sin6->sin6_port)); } byteBuffer->putLong(offset); byteBuffer->flip(); - return UtilAll::bytes2string(byteBuffer->array(), msgIDLength); + return UtilAll::bytes2string(byteBuffer->array(), msgIdLength); } MessageId MessageDecoder::decodeMessageId(const std::string& msgId) { - size_t ip_length = msgId.length() == 32 ? 4 * 2 : 16 * 2; + size_t ip_length = msgId.length() == 32 ? kIPv4AddrSize * 2 : kIPv6AddrSize * 2; ByteArray byteArray(ip_length / 2); std::string ip = msgId.substr(0, ip_length); UtilAll::string2bytes(byteArray.array(), ip); std::string port = msgId.substr(ip_length, 8); - // uint32_t portInt = ByteOrderUtil::NorminalBigEndian<uint32_t>(std::stoul(port, nullptr, 16)); uint32_t portInt = std::stoul(port, nullptr, 16); - auto* sin = ipPort2SocketAddress(byteArray, portInt); + auto* sin = IPPortToSockaddr(byteArray, portInt); std::string offset = msgId.substr(ip_length + 8); - // uint64_t offsetInt = ByteOrderUtil::NorminalBigEndian<uint64_t>(std::stoull(offset, nullptr, 16)); uint64_t offsetInt = std::stoull(offset, nullptr, 16); return MessageId(sin, offsetInt); @@ -123,22 +125,22 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody, bool msgExt->set_born_timestamp(bornTimeStamp); // 10 BORNHOST - int bornhostIPLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? 4 : 16; - ByteArray bornHost(bornhostIPLength); - byteBuffer.get(bornHost, 0, bornhostIPLength); + int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize; + ByteArray bornHost(bornHostLength); + byteBuffer.get(bornHost, 0, bornHostLength); int32_t bornPort = byteBuffer.getInt(); - msgExt->set_born_host(ipPort2SocketAddress(bornHost, bornPort)); + msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort)); // 11 STORETIMESTAMP int64_t storeTimestamp = byteBuffer.getLong(); msgExt->set_store_timestamp(storeTimestamp); // 12 STOREHOST - int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? 4 : 16; - ByteArray storeHost(bornhostIPLength); + int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize; + ByteArray storeHost(bornHostLength); byteBuffer.get(storeHost, 0, storehostIPLength); int32_t storePort = byteBuffer.getInt(); - msgExt->set_store_host(ipPort2SocketAddress(storeHost, storePort)); + msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort)); // 13 RECONSUMETIMES int32_t reconsumeTimes = byteBuffer.getInt(); diff --git a/src/message/MessageDecoder.h b/src/message/MessageDecoder.h index ad0b82d..5c45d8e 100644 --- a/src/message/MessageDecoder.h +++ b/src/message/MessageDecoder.h @@ -17,8 +17,13 @@ #ifndef ROCKETMQ_MESSAGE_MESSAGEDECODER_H_ #define ROCKETMQ_MESSAGE_MESSAGEDECODER_H_ +#ifndef WIN32 +#include <sys/socket.h> // sockaddr +#else +#include <Winsock2.h> +#endif + #include "ByteBuffer.hpp" -#include "MQException.h" #include "MQMessageExt.h" #include "MessageId.h" diff --git a/src/message/MessageExtImpl.cpp b/src/message/MessageExtImpl.cpp index f43ac35..cfd6b99 100644 --- a/src/message/MessageExtImpl.cpp +++ b/src/message/MessageExtImpl.cpp @@ -44,20 +44,14 @@ MessageExtImpl::MessageExtImpl(int queueId, commit_log_offset_(0), sys_flag_(0), born_timestamp_(bornTimestamp), - born_host_(nullptr), + born_host_(SockaddrToStorage(bornHost)), store_timestamp_(storeTimestamp), - store_host_(nullptr), + store_host_(SockaddrToStorage(storeHost)), reconsume_times_(3), prepared_transaction_offset_(0), - msg_id_(msgId) { - born_host_ = copySocketAddress(born_host_, bornHost); - store_host_ = copySocketAddress(store_host_, storeHost); -} + msg_id_(msgId) {} -MessageExtImpl::~MessageExtImpl() { - free(born_host_); - free(store_host_); -} +MessageExtImpl::~MessageExtImpl() = default; TopicFilterType MessageExtImpl::parseTopicFilterType(int32_t sysFlag) { if ((sysFlag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG) { @@ -123,15 +117,15 @@ void MessageExtImpl::set_born_timestamp(int64_t bornTimestamp) { } const struct sockaddr* MessageExtImpl::born_host() const { - return born_host_; + return reinterpret_cast<sockaddr*>(born_host_.get()); } std::string MessageExtImpl::born_host_string() const { - return socketAddress2String(born_host_); + return SockaddrToString(born_host()); } void MessageExtImpl::set_born_host(const struct sockaddr* bornHost) { - born_host_ = copySocketAddress(born_host_, bornHost); + born_host_ = SockaddrToStorage(bornHost); } int64_t MessageExtImpl::store_timestamp() const { @@ -143,15 +137,15 @@ void MessageExtImpl::set_store_timestamp(int64_t storeTimestamp) { } const struct sockaddr* MessageExtImpl::store_host() const { - return store_host_; + return reinterpret_cast<sockaddr*>(store_host_.get()); } std::string MessageExtImpl::store_host_string() const { - return socketAddress2String(store_host_); + return SockaddrToString(store_host()); } void MessageExtImpl::set_store_host(const struct sockaddr* storeHost) { - store_host_ = copySocketAddress(store_host_, storeHost); + store_host_ = SockaddrToStorage(storeHost); } const std::string& MessageExtImpl::msg_id() const { diff --git a/src/message/MessageExtImpl.h b/src/message/MessageExtImpl.h index 5d85cea..fc44471 100644 --- a/src/message/MessageExtImpl.h +++ b/src/message/MessageExtImpl.h @@ -94,9 +94,9 @@ class MessageExtImpl : public MessageImpl, // base int64_t commit_log_offset_; int32_t sys_flag_; int64_t born_timestamp_; - struct sockaddr* born_host_; + std::unique_ptr<sockaddr_storage> born_host_; int64_t store_timestamp_; - struct sockaddr* store_host_; + std::unique_ptr<sockaddr_storage> store_host_; int32_t reconsume_times_; int64_t prepared_transaction_offset_; std::string msg_id_; diff --git a/src/message/MessageId.h b/src/message/MessageId.h index 64e4603..e9a501b 100644 --- a/src/message/MessageId.h +++ b/src/message/MessageId.h @@ -27,29 +27,29 @@ namespace rocketmq { class MessageId { public: MessageId() : MessageId(nullptr, 0) {} - MessageId(struct sockaddr* address, int64_t offset) : address_(nullptr), offset_(offset) { setAddress(address); } + MessageId(const struct sockaddr* address, int64_t offset) : address_(SockaddrToStorage(address)), offset_(offset) {} - MessageId(const MessageId& other) : MessageId(other.address_, other.offset_) {} - MessageId(MessageId&& other) : address_(other.address_), offset_(other.offset_) { other.address_ = nullptr; } + MessageId(const MessageId& other) : MessageId(other.getAddress(), other.offset_) {} + MessageId(MessageId&& other) : address_(std::move(other.address_)), offset_(other.offset_) {} - virtual ~MessageId() { std::free(address_); } + virtual ~MessageId() = default; MessageId& operator=(const MessageId& other) { if (&other != this) { - setAddress(other.address_); + setAddress(other.getAddress()); this->offset_ = other.offset_; } return *this; } - const struct sockaddr* getAddress() const { return address_; } - void setAddress(struct sockaddr* address) { address_ = copySocketAddress(address_, address); } + const struct sockaddr* getAddress() const { return reinterpret_cast<sockaddr*>(address_.get()); } + void setAddress(const struct sockaddr* address) { address_ = SockaddrToStorage(address); } int64_t getOffset() const { return offset_; } void setOffset(int64_t offset) { offset_ = offset; } private: - struct sockaddr* address_; + std::unique_ptr<sockaddr_storage> address_; int64_t offset_; }; diff --git a/src/transport/EventLoop.cpp b/src/transport/EventLoop.cpp index 85ea113..5dce2d1 100644 --- a/src/transport/EventLoop.cpp +++ b/src/transport/EventLoop.cpp @@ -207,9 +207,9 @@ int BufferEvent::connect(const std::string& addr) { } try { - auto* sa = string2SocketAddress(addr); // resolve domain - peer_addr_port_ = socketAddress2String(sa); - return bufferevent_socket_connect(buffer_event_, sa, sockaddr_size(sa)); + auto* sa = StringToSockaddr(addr); // resolve domain + peer_addr_port_ = SockaddrToString(sa); + return bufferevent_socket_connect(buffer_event_, sa, SockaddrSize(sa)); } catch (const std::exception& e) { LOG_ERROR_NEW("can not connect to {}, {}", addr, e.what()); return -1; diff --git a/src/transport/SocketUtil.cpp b/src/transport/SocketUtil.cpp index 0470088..f207263 100644 --- a/src/transport/SocketUtil.cpp +++ b/src/transport/SocketUtil.cpp @@ -16,73 +16,115 @@ */ #include "SocketUtil.h" -#include <cstdlib> // std::realloc -#include <cstring> // std::memset, std::memcpy +#include <cstdlib> // std::abort +#include <cstring> // std::memcpy, std::memset -#include <sstream> -#include <stdexcept> - -#include <event2/event.h> +#include <iostream> +#include <stdexcept> // std::invalid_argument, std::runtime_error +#include <string> #ifndef WIN32 -#include <netdb.h> -#include <unistd.h> +#include <arpa/inet.h> // htons +#include <unistd.h> // gethostname +#else +#include <Winsock2.h> #endif -#include "ByteOrder.h" +#include <event2/event.h> + #include "MQException.h" -#include "UtilAll.h" namespace rocketmq { -union sockaddr_union { - struct sockaddr_in sin; - struct sockaddr_in6 sin6; -}; +std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src) { + if (src == nullptr) { + return nullptr; + } + std::unique_ptr<sockaddr_storage> ss(new sockaddr_storage); + std::memcpy(ss.get(), src, SockaddrSize(src)); + return ss; +} -thread_local static sockaddr_union sin_buf; +thread_local sockaddr_storage ss_buffer; -struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port) { - if (ip.size() == 4) { - struct sockaddr_in* sin = &sin_buf.sin; +sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port) { + sockaddr_storage* ss = &ss_buffer; + if (ip.size() == kIPv4AddrSize) { + auto* sin = reinterpret_cast<sockaddr_in*>(ss); sin->sin_family = AF_INET; - sin->sin_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port); - ByteOrderUtil::Read<decltype(sin->sin_addr)>(&sin->sin_addr, ip.array()); - return (struct sockaddr*)sin; - } else if (ip.size() == 16) { - struct sockaddr_in6* sin6 = &sin_buf.sin6; + sin->sin_port = htons(port); + std::memcpy(&sin->sin_addr, ip.array(), kIPv4AddrSize); + } else if (ip.size() == kIPv6AddrSize) { + auto* sin6 = reinterpret_cast<sockaddr_in6*>(&ss); sin6->sin6_family = AF_INET6; - sin6->sin6_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port); - ByteOrderUtil::Read<decltype(sin6->sin6_addr)>(&sin6->sin6_addr, ip.array()); - return (struct sockaddr*)sin6; + sin6->sin6_port = htons(port); + std::memcpy(&sin6->sin6_addr, ip.array(), kIPv6AddrSize); + } else { + throw std::invalid_argument("invalid ip size"); } - return nullptr; + return reinterpret_cast<sockaddr*>(ss); } -struct sockaddr* string2SocketAddress(const std::string& addr) { +sockaddr* StringToSockaddr(const std::string& addr) { + if (addr.empty()) { + throw std::invalid_argument("invalid address"); + } + std::string::size_type start_pos = addr[0] == '/' ? 1 : 0; - auto colon_pos = addr.find_last_of(":"); - std::string host = addr.substr(start_pos, colon_pos - start_pos); - std::string port = addr.substr(colon_pos + 1, addr.length() - colon_pos); - auto* sa = lookupNameServers(host); - if (sa != nullptr) { - if (sa->sa_family == AF_INET) { - auto* sin = (struct sockaddr_in*)sa; - sin->sin_port = htons((uint16_t)std::stoi(port)); - } else { - auto* sin6 = (struct sockaddr_in6*)sa; - sin6->sin6_port = htons((uint16_t)std::stoi(port)); + auto colon_pos = addr.find_last_of(':'); + auto bracket_pos = addr.find_last_of(']'); + if (bracket_pos != std::string::npos) { + // ipv6 address + if (addr.at(start_pos) != '[') { + throw std::invalid_argument("invalid address"); + } + if (colon_pos == std::string::npos) { + throw std::invalid_argument("invalid address"); } + if (colon_pos < bracket_pos) { + // have not port + if (bracket_pos != addr.size() - 1) { + throw std::invalid_argument("invalid address"); + } + colon_pos = addr.size(); + } else if (colon_pos != bracket_pos + 1) { + throw std::invalid_argument("invalid address"); + } + } else if (colon_pos == std::string::npos) { + // have not port + colon_pos = addr.size(); } + + decltype(bracket_pos) fix_bracket = bracket_pos == std::string::npos ? 0 : 1; + std::string host = addr.substr(start_pos + fix_bracket, colon_pos - start_pos - fix_bracket * 2); + auto* sa = LookupNameServers(host); + + std::string port = colon_pos >= addr.size() ? "0" : addr.substr(colon_pos + 1); + uint32_t n = std::stoul(port); + if (n > std::numeric_limits<uint16_t>::max()) { + throw std::out_of_range("port is to large"); + } + uint16_t port_num = htons(static_cast<uint16_t>(n)); + + if (sa->sa_family == AF_INET) { + auto* sin = reinterpret_cast<sockaddr_in*>(sa); + sin->sin_port = port_num; + } else if (sa->sa_family == AF_INET6) { + auto* sin6 = reinterpret_cast<sockaddr_in6*>(sa); + sin6->sin6_port = port_num; + } else { + throw std::runtime_error("don't support non-inet address families"); + } + return sa; } /** - * converts an address from network format to presentation format (a.b.c.d) + * converts an address from network format to presentation format */ -std::string socketAddress2String(const struct sockaddr* addr) { +std::string SockaddrToString(const sockaddr* addr) { if (nullptr == addr) { - return "127.0.0.1"; + return std::string(); } char buf[128]; @@ -90,38 +132,40 @@ std::string socketAddress2String(const struct sockaddr* addr) { uint16_t port = 0; if (addr->sa_family == AF_INET) { - auto* sin = (struct sockaddr_in*)addr; - if (nullptr != evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) { - address = buf; + const auto* sin = reinterpret_cast<const sockaddr_in*>(addr); + if (nullptr == evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) { + throw std::runtime_error("can not convert AF_INET address to text form"); } + address = buf; port = ntohs(sin->sin_port); } else if (addr->sa_family == AF_INET6) { - auto* sin6 = (struct sockaddr_in6*)addr; - if (nullptr != evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf))) { - address = buf; + const auto* sin6 = reinterpret_cast<const sockaddr_in6*>(addr); + if (nullptr == evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf))) { + throw std::runtime_error("can not convert AF_INET6 address to text form"); } + address = buf; port = ntohs(sin6->sin6_port); } else { - throw std::runtime_error("don't support non-inet Address families."); + throw std::runtime_error("don't support non-inet address families"); } - if (!address.empty() && port != 0) { - if (addr->sa_family == AF_INET6) { - address = "[" + address + "]"; - } - address += ":" + UtilAll::to_string(port); + if (addr->sa_family == AF_INET6) { + address = "[" + address + "]"; + } + if (port != 0) { + address += ":" + std::to_string(port); } return address; } -struct sockaddr* lookupNameServers(const std::string& hostname) { +sockaddr* LookupNameServers(const std::string& hostname) { if (hostname.empty()) { - return nullptr; + throw std::invalid_argument("invalid hostname"); } - struct evutil_addrinfo hints; - struct evutil_addrinfo* answer = NULL; + evutil_addrinfo hints; + evutil_addrinfo* answer = nullptr; /* Build the hints to tell getaddrinfo how to act. */ std::memset(&hints, 0, sizeof(hints)); @@ -131,48 +175,63 @@ struct sockaddr* lookupNameServers(const std::string& hostname) { hints.ai_flags = EVUTIL_AI_ADDRCONFIG; /* Only return addresses we can use. */ // Look up the hostname. - int err = evutil_getaddrinfo(hostname.c_str(), NULL, &hints, &answer); + int err = evutil_getaddrinfo(hostname.c_str(), nullptr, &hints, &answer); if (err != 0) { - std::string info = "Failed to resolve host name(" + hostname + "): " + evutil_gai_strerror(err); + std::string info = "Failed to resolve hostname(" + hostname + "): " + evutil_gai_strerror(err); THROW_MQEXCEPTION(UnknownHostException, info, -1); } - struct sockaddr* sin = nullptr; + sockaddr_storage* ss = &ss_buffer; - for (struct evutil_addrinfo* ai = answer; ai != NULL; ai = ai->ai_next) { + bool hit = false; + for (struct evutil_addrinfo* ai = answer; ai != nullptr; ai = ai->ai_next) { auto* ai_addr = ai->ai_addr; if (ai_addr->sa_family != AF_INET && ai_addr->sa_family != AF_INET6) { continue; } - sin = (struct sockaddr*)&sin_buf; - std::memcpy(sin, ai_addr, sockaddr_size(ai_addr)); + std::memcpy(ss, ai_addr, SockaddrSize(ai_addr)); + hit = true; break; } evutil_freeaddrinfo(answer); - return sin; + if (!hit) { + throw std::runtime_error("hostname is non-inet address family"); + } + + return reinterpret_cast<sockaddr*>(ss); } -struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src) { - if (src != nullptr) { - if (dst == nullptr || dst->sa_family != src->sa_family) { - dst = (struct sockaddr*)std::realloc(dst, sizeof(union sockaddr_union)); - } - std::memcpy(dst, src, sockaddr_size(src)); - } else { - free(dst); - dst = nullptr; +sockaddr* GetSelfIP() { + try { + return LookupNameServers(GetLocalHostname()); + } catch (const UnknownHostException& e) { + return LookupNameServers("localhost"); } - return dst; } -uint64_t h2nll(uint64_t v) { - return ByteOrderUtil::NorminalBigEndian(v); +const std::string& GetLocalHostname() { + static std::string local_hostname = []() { + char name[1024]; + if (::gethostname(name, sizeof(name)) != 0) { + return std::string(); + } + return std::string(name); + }(); + return local_hostname; } -uint64_t n2hll(uint64_t v) { - return ByteOrderUtil::NorminalBigEndian(v); +const std::string& GetLocalAddress() { + static std::string local_address = []() { + try { + return SockaddrToString(GetSelfIP()); + } catch (std::exception& e) { + std::cerr << e.what() << std::endl; + std::abort(); + } + }(); + return local_address; } } // namespace rocketmq diff --git a/src/transport/SocketUtil.h b/src/transport/SocketUtil.h index be18a38..6bfe535 100644 --- a/src/transport/SocketUtil.h +++ b/src/transport/SocketUtil.h @@ -17,13 +17,14 @@ #ifndef ROCKETMQ_TRANSPORT_SOCKETUTIL_H_ #define ROCKETMQ_TRANSPORT_SOCKETUTIL_H_ -#include <cstdint> +#include <cstddef> // size_t +#include <cstdint> // uint16_t #include <string> #ifndef WIN32 -#include <arpa/inet.h> -#include <sys/socket.h> +#include <netinet/in.h> // sockaddr_in, AF_INET, sockaddr_in6, AF_INET6 +#include <sys/socket.h> // sockaddr, sockaddr_storage #else #include <Winsock2.h> #pragma comment(lib, "ws2_32.lib") @@ -33,21 +34,41 @@ namespace rocketmq { -static inline size_t sockaddr_size(const struct sockaddr* sa) { - return sa->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); +const size_t kIPv4AddrSize = 4; +const size_t kIPv6AddrSize = 16; + +static inline size_t IpaddrSize(const sockaddr* sa) { + assert(sa != nullptr); + assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6); + return sa->sa_family == AF_INET6 ? kIPv6AddrSize : kIPv4AddrSize; +} + +static inline size_t IpaddrSize(const sockaddr_storage* ss) { + return IpaddrSize(reinterpret_cast<const sockaddr*>(ss)); +} + +static inline size_t SockaddrSize(const sockaddr* sa) { + assert(sa != nullptr); + assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6); + return sa->sa_family == AF_INET6 ? sizeof(sockaddr_in6) : sizeof(sockaddr_in); +} + +static inline size_t SockaddrSize(const sockaddr_storage* ss) { + return SockaddrSize(reinterpret_cast<const sockaddr*>(ss)); } -struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port); +std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src); -struct sockaddr* string2SocketAddress(const std::string& addr); -std::string socketAddress2String(const struct sockaddr* addr); +sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port); -struct sockaddr* lookupNameServers(const std::string& hostname); +sockaddr* StringToSockaddr(const std::string& addr); +std::string SockaddrToString(const sockaddr* addr); -struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src); +sockaddr* LookupNameServers(const std::string& hostname); -uint64_t h2nll(uint64_t v); -uint64_t n2hll(uint64_t v); +sockaddr* GetSelfIP(); +const std::string& GetLocalHostname(); +const std::string& GetLocalAddress(); } // namespace rocketmq
