This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 65d8959f3607ca863ce2d1e44ef71ace1ab3f1f8 Author: Gavin Chou <[email protected]> AuthorDate: Mon Apr 22 12:23:15 2024 +0800 [feature](cloud) Add priority network support for meta-service registry (#33931) --- cloud/src/common/CMakeLists.txt | 1 + cloud/src/common/config.h | 8 + cloud/src/common/network_util.cpp | 238 +++++++++++++++++++++++ cloud/src/common/network_util.h | 34 ++++ cloud/src/meta-service/meta_server.cpp | 3 +- cloud/src/meta-service/meta_service_resource.cpp | 10 +- cloud/test/CMakeLists.txt | 4 + cloud/test/network_util_test.cpp | 49 +++++ 8 files changed, 340 insertions(+), 7 deletions(-) diff --git a/cloud/src/common/CMakeLists.txt b/cloud/src/common/CMakeLists.txt index 166ad5052da..a2abfb075bf 100644 --- a/cloud/src/common/CMakeLists.txt +++ b/cloud/src/common/CMakeLists.txt @@ -13,6 +13,7 @@ set(COMMON_FILES encryption_util.cpp metric.cpp kms.cpp + network_util.cpp ) if (USE_JEMALLOC) diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 280faf9014c..73b8580c1f6 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -170,4 +170,12 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf"); CONF_mBool(enable_distinguish_hdfs_path, "true"); +// Declare a selection strategy for those servers have many ips. +// Note that there should at most one ip match this list. +// this is a list in semicolon-delimited format, in CIDR notation, +// e.g. 10.10.10.0/24 +// e.g. 10.10.10.0/24;192.168.0.1/24 +// If no IP match this rule, a random IP is used (usually it is the IP binded to hostname). +CONF_String(priority_networks, ""); + } // namespace doris::cloud::config diff --git a/cloud/src/common/network_util.cpp b/cloud/src/common/network_util.cpp new file mode 100644 index 00000000000..9f1b085163d --- /dev/null +++ b/cloud/src/common/network_util.cpp @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/network_util.h" + +#include <arpa/inet.h> +#include <butil/endpoint.h> +#include <butil/strings/string_split.h> +#include <ifaddrs.h> +#include <netdb.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/types.h> + +#include <sstream> +#include <vector> + +#include "common/logging.h" + +namespace doris::cloud { + +class CIDR { +public: + CIDR() : address_(0), netmask_(0xffffffff) {} + bool reset(const std::string& cidr_str) { + address_ = 0; + netmask_ = 0xffffffff; + + // check if have mask + std::string cidr_format_str = cidr_str; + int32_t have_mask = cidr_str.find("/"); + if (have_mask == -1) { + cidr_format_str.assign(cidr_str + "/32"); + } + VLOG_DEBUG << "cidr format str: " << cidr_format_str; + + std::vector<std::string> cidr_items; + butil::SplitString(cidr_format_str, '/', &cidr_items); + if (cidr_items.size() != 2) { + LOG(WARNING) << "wrong CIDR format. network=" << cidr_str; + return false; + } + + if (cidr_items[1].empty()) { + LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str; + return false; + } + + char* endptr = nullptr; + int32_t mask_length = strtol(cidr_items[1].c_str(), &endptr, 10); + if (errno != 0 && mask_length == 0) { + char errmsg[64]; + // Ignore unused return value + auto ret = strerror_r(errno, errmsg, 64); + LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str + << ", mask_length=" << mask_length << ", errno=" << errno + << ", errmsg=" << errmsg << ", strerror_r returns=" << ret; + return false; + } + if (mask_length <= 0 || mask_length > 32) { + LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str + << ", mask_length=" << mask_length; + return false; + } + + uint32_t address = 0; + if (!ip_to_int(cidr_items[0], &address)) { + LOG(WARNING) << "wrong CIDR IP value. network=" << cidr_str; + return false; + } + address_ = address; + + netmask_ = 0xffffffff; + netmask_ = netmask_ << (32 - mask_length); + return true; + } + bool contains(const std::string& ip) { + uint32_t ip_int = 0; + if (!ip_to_int(ip, &ip_int)) { + return false; + } + if ((address_ & netmask_) == (ip_int & netmask_)) { + return true; + } + return false; + } + +private: + bool ip_to_int(const std::string& ip_str, uint32_t* value) { + struct in_addr addr; + int flag = inet_aton(ip_str.c_str(), &addr); + if (flag == 0) { + return false; + } + *value = ntohl(addr.s_addr); + return true; + } + + uint32_t address_; + uint32_t netmask_; +}; + +class InetAddress { +public: + InetAddress(struct sockaddr* addr) { this->addr_ = *(struct sockaddr_in*)addr; } + bool is_address_v4() const { return addr_.sin_family == AF_INET; } + bool is_loopback_v4() const { + in_addr_t s_addr = addr_.sin_addr.s_addr; + return (ntohl(s_addr) & 0xFF000000) == 0x7F000000; + } + std::string get_host_address_v4() { + char addr_buf[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &(addr_.sin_addr), addr_buf, INET_ADDRSTRLEN); + return std::string(addr_buf); + } + +private: + struct sockaddr_in addr_; +}; + +static bool get_hosts_v4(std::vector<InetAddress>* hosts) { + ifaddrs* if_addrs = nullptr; + if (getifaddrs(&if_addrs)) { + std::stringstream ss; + char buf[64]; + LOG(FATAL) << "getifaddrs failed because " << strerror_r(errno, buf, sizeof(buf)); + return false; + } + + for (ifaddrs* if_addr = if_addrs; if_addr != nullptr; if_addr = if_addr->ifa_next) { + if (!if_addr->ifa_addr) { + continue; + } + if (if_addr->ifa_addr->sa_family == AF_INET) { // check it is IP4 + // is a valid IP4 Address + hosts->emplace_back(if_addr->ifa_addr); + } + } + + if (if_addrs != nullptr) { + freeifaddrs(if_addrs); + } + + return true; +} + +std::string get_local_ip(const std::string& priority_networks) { + std::string localhost_str = butil::my_ip_cstr(); + if (priority_networks == "") { + LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << localhost_str; + return localhost_str; + } + std::vector<CIDR> priority_cidrs; + LOG(INFO) << "priority CIDRs: " << priority_networks; + std::vector<std::string> cidr_strs; + butil::SplitString(priority_networks, ';', &cidr_strs); + for (auto& cidr_str : cidr_strs) { + CIDR cidr; + if (!cidr.reset(cidr_str)) { + LOG(FATAL) << "wrong cidr format. cidr_str=" << cidr_str; + return localhost_str; + } + priority_cidrs.push_back(cidr); + } + + std::vector<InetAddress> hosts; + if (!get_hosts_v4(&hosts)) { + LOG(FATAL) << "failed to getifaddrs"; + return localhost_str; + } + + if (hosts.empty()) { + LOG(FATAL) << "failed to get host"; + return localhost_str; + } + + auto is_in_prior_network = [&priority_cidrs](const std::string& ip) { + for (auto& cidr : priority_cidrs) { + if (cidr.contains(ip)) { + return true; + } + } + return false; + }; + + std::string loopback; + localhost_str = ""; + for (auto addr_it = hosts.begin(); addr_it != hosts.end(); ++addr_it) { + if ((*addr_it).is_address_v4()) { + VLOG_DEBUG << "check ip=" << addr_it->get_host_address_v4(); + if ((*addr_it).is_loopback_v4()) { + loopback = addr_it->get_host_address_v4(); + } else if (!priority_cidrs.empty()) { + if (is_in_prior_network(addr_it->get_host_address_v4())) { + localhost_str = addr_it->get_host_address_v4(); + + break; + } + } else { + localhost_str = addr_it->get_host_address_v4(); + break; + } + } + } + if (!localhost_str.empty()) { + LOG(INFO) << "local host ip=" << localhost_str; + return localhost_str; + } + + if (!loopback.empty()) { + localhost_str = loopback; + LOG(WARNING) << "fail to find one valid non-loopback address, use loopback address: " + << localhost_str; + } else { + localhost_str = butil::my_ip_cstr(); + LOG(WARNING) + << "fail to find valid address of priority cidrs in conf, use butil::my_ip_cstr(): " + << localhost_str; + } + + return localhost_str; +} + +} // namespace doris::cloud diff --git a/cloud/src/common/network_util.h b/cloud/src/common/network_util.h new file mode 100644 index 00000000000..ebd7ae2e9fc --- /dev/null +++ b/cloud/src/common/network_util.h @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +namespace doris::cloud { + +/** + * Gets local IP with give CIDR mask. + * * If the priority_networks is empty, the IP binded to hostname is returned + * * If no IP fit the priority_networks CIDR mask, loop back IP is returned + * + * @param priority_networks CIDR mask, can be empty + * @return the IP string + */ +std::string get_local_ip(const std::string& priority_networks); + +} // namespace doris::cloud diff --git a/cloud/src/meta-service/meta_server.cpp b/cloud/src/meta-service/meta_server.cpp index da1d6ecac33..f9b4bca146d 100644 --- a/cloud/src/meta-service/meta_server.cpp +++ b/cloud/src/meta-service/meta_server.cpp @@ -30,6 +30,7 @@ #include "common/config.h" #include "common/metric.h" +#include "common/network_util.h" #include "common/sync_point.h" #include "common/util.h" #include "meta-service/keys.h" @@ -96,7 +97,7 @@ void MetaServer::stop() { void MetaServerRegister::prepare_registry(ServiceRegistryPB* reg) { using namespace std::chrono; auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); - std::string ip = butil::my_ip_cstr(); + std::string ip = get_local_ip(config::priority_networks); int32_t port = config::brpc_listen_port; std::string id = ip + ":" + std::to_string(port); ServiceRegistryPB::Item item; diff --git a/cloud/src/meta-service/meta_service_resource.cpp b/cloud/src/meta-service/meta_service_resource.cpp index e5b3720cd24..a6b676ba6db 100644 --- a/cloud/src/meta-service/meta_service_resource.cpp +++ b/cloud/src/meta-service/meta_service_resource.cpp @@ -29,6 +29,7 @@ #include "common/encryption_util.h" #include "common/logging.h" +#include "common/network_util.h" #include "common/string_util.h" #include "common/sync_point.h" #include "meta-service/keys.h" @@ -3270,12 +3271,9 @@ void notify_refresh_instance(std::shared_ptr<TxnKv> txn_kv, const std::string& i << " err=" << err; return; } - std::string self_endpoint; - if (config::hostname.empty()) { - self_endpoint = fmt::format("{}:{}", butil::my_ip_cstr(), config::brpc_listen_port); - } else { - self_endpoint = fmt::format("{}:{}", config::hostname, config::brpc_listen_port); - } + std::string self_endpoint = + config::hostname.empty() ? get_local_ip(config::priority_networks) : config::hostname; + self_endpoint = fmt::format("{}:{}", self_endpoint, config::brpc_listen_port); ServiceRegistryPB reg; reg.ParseFromString(val); diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt index 96cc5a4de42..331e85fab2e 100644 --- a/cloud/test/CMakeLists.txt +++ b/cloud/test/CMakeLists.txt @@ -49,6 +49,8 @@ add_executable(hdfs_accessor_test hdfs_accessor_test.cpp) add_executable(stopwatch_test stopwatch_test.cpp) +add_executable(network_util_test network_util_test.cpp) + message("Meta-service test dependencies: ${TEST_LINK_LIBS}") target_link_libraries(sync_point_test ${TEST_LINK_LIBS}) @@ -78,6 +80,8 @@ target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS}) target_link_libraries(stopwatch_test ${TEST_LINK_LIBS}) +target_link_libraries(network_util_test ${TEST_LINK_LIBS}) + # FDB related tests need to be linked with libfdb_c set(FDB_LINKER_FLAGS "-lfdb_c -L${THIRDPARTY_DIR}/lib") diff --git a/cloud/test/network_util_test.cpp b/cloud/test/network_util_test.cpp new file mode 100644 index 00000000000..06f425f244f --- /dev/null +++ b/cloud/test/network_util_test.cpp @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/network_util.h" + +#include "common/config.h" +#include "gtest/gtest.h" + +int main(int argc, char** argv) { + doris::cloud::config::init(nullptr, true); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(NetWorkUtilTest, GetLocaHostTest) { + doris::cloud::config::priority_networks = ""; + // prepare an existed ip for test + auto ip = doris::cloud::get_local_ip(doris::cloud::config::priority_networks); + std::cout << "get ip: " << ip << " from butil::my_ip_cstr()" << std::endl; + + { // possible not match CIDR + doris::cloud::config::priority_networks = "127.0.0.1/8"; + ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks), "127.0.0.1"); + } + + { // not match CIDR + doris::cloud::config::priority_networks = "1.2.3.4/8"; + ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks), "127.0.0.1"); + } + + { // must match CIDR + doris::cloud::config::priority_networks = ip + "/16"; + ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks), ip); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
