This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4b3d27d9c57 [feature](cloud) Add priority network support for
meta-service registry (#33931)
4b3d27d9c57 is described below
commit 4b3d27d9c578c2c1d8c81c23f4c7b0422196f3d2
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Apr 22 12:23:15 2024 +0800
[feature](cloud) Add priority network support for meta-service registry
(#33931)
---
cloud/src/common/CMakeLists.txt | 1 +
cloud/src/common/config.h | 8 +
cloud/src/common/network_util.cpp | 238 +++++++++++++++++++++++
cloud/src/common/network_util.h | 34 ++++
cloud/src/meta-service/meta_server.cpp | 3 +-
cloud/src/meta-service/meta_service_resource.cpp | 10 +-
cloud/test/CMakeLists.txt | 4 +
cloud/test/network_util_test.cpp | 49 +++++
8 files changed, 340 insertions(+), 7 deletions(-)
diff --git a/cloud/src/common/CMakeLists.txt b/cloud/src/common/CMakeLists.txt
index 166ad5052da..a2abfb075bf 100644
--- a/cloud/src/common/CMakeLists.txt
+++ b/cloud/src/common/CMakeLists.txt
@@ -13,6 +13,7 @@ set(COMMON_FILES
encryption_util.cpp
metric.cpp
kms.cpp
+ network_util.cpp
)
if (USE_JEMALLOC)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 280faf9014c..73b8580c1f6 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -170,4 +170,12 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");
CONF_mBool(enable_distinguish_hdfs_path, "true");
+// Declare a selection strategy for those servers have many ips.
+// Note that there should at most one ip match this list.
+// this is a list in semicolon-delimited format, in CIDR notation,
+// e.g. 10.10.10.0/24
+// e.g. 10.10.10.0/24;192.168.0.1/24
+// If no IP match this rule, a random IP is used (usually it is the IP binded
to hostname).
+CONF_String(priority_networks, "");
+
} // namespace doris::cloud::config
diff --git a/cloud/src/common/network_util.cpp
b/cloud/src/common/network_util.cpp
new file mode 100644
index 00000000000..9f1b085163d
--- /dev/null
+++ b/cloud/src/common/network_util.cpp
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/network_util.h"
+
+#include <arpa/inet.h>
+#include <butil/endpoint.h>
+#include <butil/strings/string_split.h>
+#include <ifaddrs.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <sstream>
+#include <vector>
+
+#include "common/logging.h"
+
+namespace doris::cloud {
+
+class CIDR {
+public:
+ CIDR() : address_(0), netmask_(0xffffffff) {}
+ bool reset(const std::string& cidr_str) {
+ address_ = 0;
+ netmask_ = 0xffffffff;
+
+ // check if have mask
+ std::string cidr_format_str = cidr_str;
+ int32_t have_mask = cidr_str.find("/");
+ if (have_mask == -1) {
+ cidr_format_str.assign(cidr_str + "/32");
+ }
+ VLOG_DEBUG << "cidr format str: " << cidr_format_str;
+
+ std::vector<std::string> cidr_items;
+ butil::SplitString(cidr_format_str, '/', &cidr_items);
+ if (cidr_items.size() != 2) {
+ LOG(WARNING) << "wrong CIDR format. network=" << cidr_str;
+ return false;
+ }
+
+ if (cidr_items[1].empty()) {
+ LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str;
+ return false;
+ }
+
+ char* endptr = nullptr;
+ int32_t mask_length = strtol(cidr_items[1].c_str(), &endptr, 10);
+ if (errno != 0 && mask_length == 0) {
+ char errmsg[64];
+ // Ignore unused return value
+ auto ret = strerror_r(errno, errmsg, 64);
+ LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str
+ << ", mask_length=" << mask_length << ", errno=" <<
errno
+ << ", errmsg=" << errmsg << ", strerror_r returns="
<< ret;
+ return false;
+ }
+ if (mask_length <= 0 || mask_length > 32) {
+ LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str
+ << ", mask_length=" << mask_length;
+ return false;
+ }
+
+ uint32_t address = 0;
+ if (!ip_to_int(cidr_items[0], &address)) {
+ LOG(WARNING) << "wrong CIDR IP value. network=" << cidr_str;
+ return false;
+ }
+ address_ = address;
+
+ netmask_ = 0xffffffff;
+ netmask_ = netmask_ << (32 - mask_length);
+ return true;
+ }
+ bool contains(const std::string& ip) {
+ uint32_t ip_int = 0;
+ if (!ip_to_int(ip, &ip_int)) {
+ return false;
+ }
+ if ((address_ & netmask_) == (ip_int & netmask_)) {
+ return true;
+ }
+ return false;
+ }
+
+private:
+ bool ip_to_int(const std::string& ip_str, uint32_t* value) {
+ struct in_addr addr;
+ int flag = inet_aton(ip_str.c_str(), &addr);
+ if (flag == 0) {
+ return false;
+ }
+ *value = ntohl(addr.s_addr);
+ return true;
+ }
+
+ uint32_t address_;
+ uint32_t netmask_;
+};
+
+class InetAddress {
+public:
+ InetAddress(struct sockaddr* addr) { this->addr_ = *(struct
sockaddr_in*)addr; }
+ bool is_address_v4() const { return addr_.sin_family == AF_INET; }
+ bool is_loopback_v4() const {
+ in_addr_t s_addr = addr_.sin_addr.s_addr;
+ return (ntohl(s_addr) & 0xFF000000) == 0x7F000000;
+ }
+ std::string get_host_address_v4() {
+ char addr_buf[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &(addr_.sin_addr), addr_buf, INET_ADDRSTRLEN);
+ return std::string(addr_buf);
+ }
+
+private:
+ struct sockaddr_in addr_;
+};
+
+static bool get_hosts_v4(std::vector<InetAddress>* hosts) {
+ ifaddrs* if_addrs = nullptr;
+ if (getifaddrs(&if_addrs)) {
+ std::stringstream ss;
+ char buf[64];
+ LOG(FATAL) << "getifaddrs failed because " << strerror_r(errno, buf,
sizeof(buf));
+ return false;
+ }
+
+ for (ifaddrs* if_addr = if_addrs; if_addr != nullptr; if_addr =
if_addr->ifa_next) {
+ if (!if_addr->ifa_addr) {
+ continue;
+ }
+ if (if_addr->ifa_addr->sa_family == AF_INET) { // check it is IP4
+ // is a valid IP4 Address
+ hosts->emplace_back(if_addr->ifa_addr);
+ }
+ }
+
+ if (if_addrs != nullptr) {
+ freeifaddrs(if_addrs);
+ }
+
+ return true;
+}
+
+std::string get_local_ip(const std::string& priority_networks) {
+ std::string localhost_str = butil::my_ip_cstr();
+ if (priority_networks == "") {
+ LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" <<
localhost_str;
+ return localhost_str;
+ }
+ std::vector<CIDR> priority_cidrs;
+ LOG(INFO) << "priority CIDRs: " << priority_networks;
+ std::vector<std::string> cidr_strs;
+ butil::SplitString(priority_networks, ';', &cidr_strs);
+ for (auto& cidr_str : cidr_strs) {
+ CIDR cidr;
+ if (!cidr.reset(cidr_str)) {
+ LOG(FATAL) << "wrong cidr format. cidr_str=" << cidr_str;
+ return localhost_str;
+ }
+ priority_cidrs.push_back(cidr);
+ }
+
+ std::vector<InetAddress> hosts;
+ if (!get_hosts_v4(&hosts)) {
+ LOG(FATAL) << "failed to getifaddrs";
+ return localhost_str;
+ }
+
+ if (hosts.empty()) {
+ LOG(FATAL) << "failed to get host";
+ return localhost_str;
+ }
+
+ auto is_in_prior_network = [&priority_cidrs](const std::string& ip) {
+ for (auto& cidr : priority_cidrs) {
+ if (cidr.contains(ip)) {
+ return true;
+ }
+ }
+ return false;
+ };
+
+ std::string loopback;
+ localhost_str = "";
+ for (auto addr_it = hosts.begin(); addr_it != hosts.end(); ++addr_it) {
+ if ((*addr_it).is_address_v4()) {
+ VLOG_DEBUG << "check ip=" << addr_it->get_host_address_v4();
+ if ((*addr_it).is_loopback_v4()) {
+ loopback = addr_it->get_host_address_v4();
+ } else if (!priority_cidrs.empty()) {
+ if (is_in_prior_network(addr_it->get_host_address_v4())) {
+ localhost_str = addr_it->get_host_address_v4();
+
+ break;
+ }
+ } else {
+ localhost_str = addr_it->get_host_address_v4();
+ break;
+ }
+ }
+ }
+ if (!localhost_str.empty()) {
+ LOG(INFO) << "local host ip=" << localhost_str;
+ return localhost_str;
+ }
+
+ if (!loopback.empty()) {
+ localhost_str = loopback;
+ LOG(WARNING) << "fail to find one valid non-loopback address, use
loopback address: "
+ << localhost_str;
+ } else {
+ localhost_str = butil::my_ip_cstr();
+ LOG(WARNING)
+ << "fail to find valid address of priority cidrs in conf, use
butil::my_ip_cstr(): "
+ << localhost_str;
+ }
+
+ return localhost_str;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/common/network_util.h b/cloud/src/common/network_util.h
new file mode 100644
index 00000000000..ebd7ae2e9fc
--- /dev/null
+++ b/cloud/src/common/network_util.h
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+namespace doris::cloud {
+
+/**
+ * Gets local IP with give CIDR mask.
+ * * If the priority_networks is empty, the IP binded to hostname is returned
+ * * If no IP fit the priority_networks CIDR mask, loop back IP is returned
+ *
+ * @param priority_networks CIDR mask, can be empty
+ * @return the IP string
+ */
+std::string get_local_ip(const std::string& priority_networks);
+
+} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_server.cpp
b/cloud/src/meta-service/meta_server.cpp
index da1d6ecac33..f9b4bca146d 100644
--- a/cloud/src/meta-service/meta_server.cpp
+++ b/cloud/src/meta-service/meta_server.cpp
@@ -30,6 +30,7 @@
#include "common/config.h"
#include "common/metric.h"
+#include "common/network_util.h"
#include "common/sync_point.h"
#include "common/util.h"
#include "meta-service/keys.h"
@@ -96,7 +97,7 @@ void MetaServer::stop() {
void MetaServerRegister::prepare_registry(ServiceRegistryPB* reg) {
using namespace std::chrono;
auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- std::string ip = butil::my_ip_cstr();
+ std::string ip = get_local_ip(config::priority_networks);
int32_t port = config::brpc_listen_port;
std::string id = ip + ":" + std::to_string(port);
ServiceRegistryPB::Item item;
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index 3e7e407f414..57220e50c5e 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -29,6 +29,7 @@
#include "common/encryption_util.h"
#include "common/logging.h"
+#include "common/network_util.h"
#include "common/string_util.h"
#include "common/sync_point.h"
#include "meta-service/keys.h"
@@ -3280,12 +3281,9 @@ void notify_refresh_instance(std::shared_ptr<TxnKv>
txn_kv, const std::string& i
<< " err=" << err;
return;
}
- std::string self_endpoint;
- if (config::hostname.empty()) {
- self_endpoint = fmt::format("{}:{}", butil::my_ip_cstr(),
config::brpc_listen_port);
- } else {
- self_endpoint = fmt::format("{}:{}", config::hostname,
config::brpc_listen_port);
- }
+ std::string self_endpoint =
+ config::hostname.empty() ? get_local_ip(config::priority_networks)
: config::hostname;
+ self_endpoint = fmt::format("{}:{}", self_endpoint,
config::brpc_listen_port);
ServiceRegistryPB reg;
reg.ParseFromString(val);
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 96cc5a4de42..331e85fab2e 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -49,6 +49,8 @@ add_executable(hdfs_accessor_test hdfs_accessor_test.cpp)
add_executable(stopwatch_test stopwatch_test.cpp)
+add_executable(network_util_test network_util_test.cpp)
+
message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
target_link_libraries(sync_point_test ${TEST_LINK_LIBS})
@@ -78,6 +80,8 @@ target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS})
target_link_libraries(stopwatch_test ${TEST_LINK_LIBS})
+target_link_libraries(network_util_test ${TEST_LINK_LIBS})
+
# FDB related tests need to be linked with libfdb_c
set(FDB_LINKER_FLAGS "-lfdb_c -L${THIRDPARTY_DIR}/lib")
diff --git a/cloud/test/network_util_test.cpp b/cloud/test/network_util_test.cpp
new file mode 100644
index 00000000000..06f425f244f
--- /dev/null
+++ b/cloud/test/network_util_test.cpp
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/network_util.h"
+
+#include "common/config.h"
+#include "gtest/gtest.h"
+
+int main(int argc, char** argv) {
+ doris::cloud::config::init(nullptr, true);
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+TEST(NetWorkUtilTest, GetLocaHostTest) {
+ doris::cloud::config::priority_networks = "";
+ // prepare an existed ip for test
+ auto ip =
doris::cloud::get_local_ip(doris::cloud::config::priority_networks);
+ std::cout << "get ip: " << ip << " from butil::my_ip_cstr()" << std::endl;
+
+ { // possible not match CIDR
+ doris::cloud::config::priority_networks = "127.0.0.1/8";
+
ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks),
"127.0.0.1");
+ }
+
+ { // not match CIDR
+ doris::cloud::config::priority_networks = "1.2.3.4/8";
+
ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks),
"127.0.0.1");
+ }
+
+ { // must match CIDR
+ doris::cloud::config::priority_networks = ip + "/16";
+
ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks),
ip);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]