This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b3d27d9c57 [feature](cloud) Add priority network support for 
meta-service registry (#33931)
4b3d27d9c57 is described below

commit 4b3d27d9c578c2c1d8c81c23f4c7b0422196f3d2
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Apr 22 12:23:15 2024 +0800

    [feature](cloud) Add priority network support for meta-service registry 
(#33931)
---
 cloud/src/common/CMakeLists.txt                  |   1 +
 cloud/src/common/config.h                        |   8 +
 cloud/src/common/network_util.cpp                | 238 +++++++++++++++++++++++
 cloud/src/common/network_util.h                  |  34 ++++
 cloud/src/meta-service/meta_server.cpp           |   3 +-
 cloud/src/meta-service/meta_service_resource.cpp |  10 +-
 cloud/test/CMakeLists.txt                        |   4 +
 cloud/test/network_util_test.cpp                 |  49 +++++
 8 files changed, 340 insertions(+), 7 deletions(-)

diff --git a/cloud/src/common/CMakeLists.txt b/cloud/src/common/CMakeLists.txt
index 166ad5052da..a2abfb075bf 100644
--- a/cloud/src/common/CMakeLists.txt
+++ b/cloud/src/common/CMakeLists.txt
@@ -13,6 +13,7 @@ set(COMMON_FILES
     encryption_util.cpp
     metric.cpp
     kms.cpp
+    network_util.cpp
 )
 
 if (USE_JEMALLOC)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 280faf9014c..73b8580c1f6 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -170,4 +170,12 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");
 
 CONF_mBool(enable_distinguish_hdfs_path, "true");
 
+// Declare a selection strategy for those servers have many ips.
+// Note that there should at most one ip match this list.
+// this is a list in semicolon-delimited format, in CIDR notation,
+// e.g. 10.10.10.0/24
+// e.g. 10.10.10.0/24;192.168.0.1/24
+// If no IP match this rule, a random IP is used (usually it is the IP binded 
to hostname).
+CONF_String(priority_networks, "");
+
 } // namespace doris::cloud::config
diff --git a/cloud/src/common/network_util.cpp 
b/cloud/src/common/network_util.cpp
new file mode 100644
index 00000000000..9f1b085163d
--- /dev/null
+++ b/cloud/src/common/network_util.cpp
@@ -0,0 +1,238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/network_util.h"
+
+#include <arpa/inet.h>
+#include <butil/endpoint.h>
+#include <butil/strings/string_split.h>
+#include <ifaddrs.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+
+#include <sstream>
+#include <vector>
+
+#include "common/logging.h"
+
+namespace doris::cloud {
+
+class CIDR {
+public:
+    CIDR() : address_(0), netmask_(0xffffffff) {}
+    bool reset(const std::string& cidr_str) {
+        address_ = 0;
+        netmask_ = 0xffffffff;
+
+        // check if have mask
+        std::string cidr_format_str = cidr_str;
+        int32_t have_mask = cidr_str.find("/");
+        if (have_mask == -1) {
+            cidr_format_str.assign(cidr_str + "/32");
+        }
+        VLOG_DEBUG << "cidr format str: " << cidr_format_str;
+
+        std::vector<std::string> cidr_items;
+        butil::SplitString(cidr_format_str, '/', &cidr_items);
+        if (cidr_items.size() != 2) {
+            LOG(WARNING) << "wrong CIDR format. network=" << cidr_str;
+            return false;
+        }
+
+        if (cidr_items[1].empty()) {
+            LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str;
+            return false;
+        }
+
+        char* endptr = nullptr;
+        int32_t mask_length = strtol(cidr_items[1].c_str(), &endptr, 10);
+        if (errno != 0 && mask_length == 0) {
+            char errmsg[64];
+            // Ignore unused return value
+            auto ret = strerror_r(errno, errmsg, 64);
+            LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str
+                         << ", mask_length=" << mask_length << ", errno=" << 
errno
+                         << ", errmsg=" << errmsg << ", strerror_r returns=" 
<< ret;
+            return false;
+        }
+        if (mask_length <= 0 || mask_length > 32) {
+            LOG(WARNING) << "wrong CIDR mask format. network=" << cidr_str
+                         << ", mask_length=" << mask_length;
+            return false;
+        }
+
+        uint32_t address = 0;
+        if (!ip_to_int(cidr_items[0], &address)) {
+            LOG(WARNING) << "wrong CIDR IP value. network=" << cidr_str;
+            return false;
+        }
+        address_ = address;
+
+        netmask_ = 0xffffffff;
+        netmask_ = netmask_ << (32 - mask_length);
+        return true;
+    }
+    bool contains(const std::string& ip) {
+        uint32_t ip_int = 0;
+        if (!ip_to_int(ip, &ip_int)) {
+            return false;
+        }
+        if ((address_ & netmask_) == (ip_int & netmask_)) {
+            return true;
+        }
+        return false;
+    }
+
+private:
+    bool ip_to_int(const std::string& ip_str, uint32_t* value) {
+        struct in_addr addr;
+        int flag = inet_aton(ip_str.c_str(), &addr);
+        if (flag == 0) {
+            return false;
+        }
+        *value = ntohl(addr.s_addr);
+        return true;
+    }
+
+    uint32_t address_;
+    uint32_t netmask_;
+};
+
+class InetAddress {
+public:
+    InetAddress(struct sockaddr* addr) { this->addr_ = *(struct 
sockaddr_in*)addr; }
+    bool is_address_v4() const { return addr_.sin_family == AF_INET; }
+    bool is_loopback_v4() const {
+        in_addr_t s_addr = addr_.sin_addr.s_addr;
+        return (ntohl(s_addr) & 0xFF000000) == 0x7F000000;
+    }
+    std::string get_host_address_v4() {
+        char addr_buf[INET_ADDRSTRLEN];
+        inet_ntop(AF_INET, &(addr_.sin_addr), addr_buf, INET_ADDRSTRLEN);
+        return std::string(addr_buf);
+    }
+
+private:
+    struct sockaddr_in addr_;
+};
+
+static bool get_hosts_v4(std::vector<InetAddress>* hosts) {
+    ifaddrs* if_addrs = nullptr;
+    if (getifaddrs(&if_addrs)) {
+        std::stringstream ss;
+        char buf[64];
+        LOG(FATAL) << "getifaddrs failed because " << strerror_r(errno, buf, 
sizeof(buf));
+        return false;
+    }
+
+    for (ifaddrs* if_addr = if_addrs; if_addr != nullptr; if_addr = 
if_addr->ifa_next) {
+        if (!if_addr->ifa_addr) {
+            continue;
+        }
+        if (if_addr->ifa_addr->sa_family == AF_INET) { // check it is IP4
+            // is a valid IP4 Address
+            hosts->emplace_back(if_addr->ifa_addr);
+        }
+    }
+
+    if (if_addrs != nullptr) {
+        freeifaddrs(if_addrs);
+    }
+
+    return true;
+}
+
+std::string get_local_ip(const std::string& priority_networks) {
+    std::string localhost_str = butil::my_ip_cstr();
+    if (priority_networks == "") {
+        LOG(INFO) << "use butil::my_ip_cstr(), local host ip=" << 
localhost_str;
+        return localhost_str;
+    }
+    std::vector<CIDR> priority_cidrs;
+    LOG(INFO) << "priority CIDRs: " << priority_networks;
+    std::vector<std::string> cidr_strs;
+    butil::SplitString(priority_networks, ';', &cidr_strs);
+    for (auto& cidr_str : cidr_strs) {
+        CIDR cidr;
+        if (!cidr.reset(cidr_str)) {
+            LOG(FATAL) << "wrong cidr format. cidr_str=" << cidr_str;
+            return localhost_str;
+        }
+        priority_cidrs.push_back(cidr);
+    }
+
+    std::vector<InetAddress> hosts;
+    if (!get_hosts_v4(&hosts)) {
+        LOG(FATAL) << "failed to getifaddrs";
+        return localhost_str;
+    }
+
+    if (hosts.empty()) {
+        LOG(FATAL) << "failed to get host";
+        return localhost_str;
+    }
+
+    auto is_in_prior_network = [&priority_cidrs](const std::string& ip) {
+        for (auto& cidr : priority_cidrs) {
+            if (cidr.contains(ip)) {
+                return true;
+            }
+        }
+        return false;
+    };
+
+    std::string loopback;
+    localhost_str = "";
+    for (auto addr_it = hosts.begin(); addr_it != hosts.end(); ++addr_it) {
+        if ((*addr_it).is_address_v4()) {
+            VLOG_DEBUG << "check ip=" << addr_it->get_host_address_v4();
+            if ((*addr_it).is_loopback_v4()) {
+                loopback = addr_it->get_host_address_v4();
+            } else if (!priority_cidrs.empty()) {
+                if (is_in_prior_network(addr_it->get_host_address_v4())) {
+                    localhost_str = addr_it->get_host_address_v4();
+
+                    break;
+                }
+            } else {
+                localhost_str = addr_it->get_host_address_v4();
+                break;
+            }
+        }
+    }
+    if (!localhost_str.empty()) {
+        LOG(INFO) << "local host ip=" << localhost_str;
+        return localhost_str;
+    }
+
+    if (!loopback.empty()) {
+        localhost_str = loopback;
+        LOG(WARNING) << "fail to find one valid non-loopback address, use 
loopback address: "
+                     << localhost_str;
+    } else {
+        localhost_str = butil::my_ip_cstr();
+        LOG(WARNING)
+                << "fail to find valid address of priority cidrs in conf, use 
butil::my_ip_cstr(): "
+                << localhost_str;
+    }
+
+    return localhost_str;
+}
+
+} // namespace doris::cloud
diff --git a/cloud/src/common/network_util.h b/cloud/src/common/network_util.h
new file mode 100644
index 00000000000..ebd7ae2e9fc
--- /dev/null
+++ b/cloud/src/common/network_util.h
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+namespace doris::cloud {
+
+/**
+ * Gets local IP with give CIDR mask.
+ * * If the priority_networks is empty, the IP binded to hostname is returned
+ * * If no IP fit the priority_networks CIDR mask, loop back IP is returned
+ *
+ * @param priority_networks CIDR mask, can be empty
+ * @return the IP string
+ */
+std::string get_local_ip(const std::string& priority_networks);
+
+} // namespace doris::cloud
diff --git a/cloud/src/meta-service/meta_server.cpp 
b/cloud/src/meta-service/meta_server.cpp
index da1d6ecac33..f9b4bca146d 100644
--- a/cloud/src/meta-service/meta_server.cpp
+++ b/cloud/src/meta-service/meta_server.cpp
@@ -30,6 +30,7 @@
 
 #include "common/config.h"
 #include "common/metric.h"
+#include "common/network_util.h"
 #include "common/sync_point.h"
 #include "common/util.h"
 #include "meta-service/keys.h"
@@ -96,7 +97,7 @@ void MetaServer::stop() {
 void MetaServerRegister::prepare_registry(ServiceRegistryPB* reg) {
     using namespace std::chrono;
     auto now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
-    std::string ip = butil::my_ip_cstr();
+    std::string ip = get_local_ip(config::priority_networks);
     int32_t port = config::brpc_listen_port;
     std::string id = ip + ":" + std::to_string(port);
     ServiceRegistryPB::Item item;
diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index 3e7e407f414..57220e50c5e 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -29,6 +29,7 @@
 
 #include "common/encryption_util.h"
 #include "common/logging.h"
+#include "common/network_util.h"
 #include "common/string_util.h"
 #include "common/sync_point.h"
 #include "meta-service/keys.h"
@@ -3280,12 +3281,9 @@ void notify_refresh_instance(std::shared_ptr<TxnKv> 
txn_kv, const std::string& i
                      << " err=" << err;
         return;
     }
-    std::string self_endpoint;
-    if (config::hostname.empty()) {
-        self_endpoint = fmt::format("{}:{}", butil::my_ip_cstr(), 
config::brpc_listen_port);
-    } else {
-        self_endpoint = fmt::format("{}:{}", config::hostname, 
config::brpc_listen_port);
-    }
+    std::string self_endpoint =
+            config::hostname.empty() ? get_local_ip(config::priority_networks) 
: config::hostname;
+    self_endpoint = fmt::format("{}:{}", self_endpoint, 
config::brpc_listen_port);
     ServiceRegistryPB reg;
     reg.ParseFromString(val);
 
diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt
index 96cc5a4de42..331e85fab2e 100644
--- a/cloud/test/CMakeLists.txt
+++ b/cloud/test/CMakeLists.txt
@@ -49,6 +49,8 @@ add_executable(hdfs_accessor_test hdfs_accessor_test.cpp)
 
 add_executable(stopwatch_test stopwatch_test.cpp)
 
+add_executable(network_util_test network_util_test.cpp)
+
 message("Meta-service test dependencies: ${TEST_LINK_LIBS}")
 target_link_libraries(sync_point_test ${TEST_LINK_LIBS})
 
@@ -78,6 +80,8 @@ target_link_libraries(hdfs_accessor_test ${TEST_LINK_LIBS})
 
 target_link_libraries(stopwatch_test ${TEST_LINK_LIBS})
 
+target_link_libraries(network_util_test ${TEST_LINK_LIBS})
+
 # FDB related tests need to be linked with libfdb_c
 set(FDB_LINKER_FLAGS "-lfdb_c -L${THIRDPARTY_DIR}/lib")
 
diff --git a/cloud/test/network_util_test.cpp b/cloud/test/network_util_test.cpp
new file mode 100644
index 00000000000..06f425f244f
--- /dev/null
+++ b/cloud/test/network_util_test.cpp
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/network_util.h"
+
+#include "common/config.h"
+#include "gtest/gtest.h"
+
+int main(int argc, char** argv) {
+    doris::cloud::config::init(nullptr, true);
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
+
+TEST(NetWorkUtilTest, GetLocaHostTest) {
+    doris::cloud::config::priority_networks = "";
+    // prepare an existed ip for test
+    auto ip = 
doris::cloud::get_local_ip(doris::cloud::config::priority_networks);
+    std::cout << "get ip: " << ip << " from butil::my_ip_cstr()" << std::endl;
+
+    { // possible not match CIDR
+        doris::cloud::config::priority_networks = "127.0.0.1/8";
+        
ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks), 
"127.0.0.1");
+    }
+
+    { // not match CIDR
+        doris::cloud::config::priority_networks = "1.2.3.4/8";
+        
ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks), 
"127.0.0.1");
+    }
+
+    { // must match CIDR
+        doris::cloud::config::priority_networks = ip + "/16";
+        
ASSERT_EQ(doris::cloud::get_local_ip(doris::cloud::config::priority_networks), 
ip);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to