This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new a461fa511 feat(fqdn): add operator< and use shared_ptr for 
group_host_port() instead of raw pointer for host_port (#1825)
a461fa511 is described below

commit a461fa511a409863c592a9f624c553c1485c6404
Author: Guohao Li <[email protected]>
AuthorDate: Thu Jan 11 14:26:59 2024 +0800

    feat(fqdn): add operator< and use shared_ptr for group_host_port() instead 
of raw pointer for host_port (#1825)
    
    https://github.com/apache/incubator-pegasus/issues/1824
---
 src/runtime/rpc/dns_resolver.cpp    |  1 +
 src/runtime/rpc/group_host_port.h   |  4 +--
 src/runtime/rpc/rpc_host_port.cpp   | 15 ++++-----
 src/runtime/rpc/rpc_host_port.h     | 22 +++++++++++--
 src/runtime/test/host_port_test.cpp | 66 ++++++++++++++++++++++++++++---------
 5 files changed, 79 insertions(+), 29 deletions(-)

diff --git a/src/runtime/rpc/dns_resolver.cpp b/src/runtime/rpc/dns_resolver.cpp
index 0cc9a0616..8ccbdac56 100644
--- a/src/runtime/rpc/dns_resolver.cpp
+++ b/src/runtime/rpc/dns_resolver.cpp
@@ -18,6 +18,7 @@
  */
 
 #include <algorithm>
+#include <memory>
 #include <utility>
 
 #include "fmt/format.h"
diff --git a/src/runtime/rpc/group_host_port.h 
b/src/runtime/rpc/group_host_port.h
index cf7cab19b..1bc8989ea 100644
--- a/src/runtime/rpc/group_host_port.h
+++ b/src/runtime/rpc/group_host_port.h
@@ -44,7 +44,7 @@ static constexpr int kInvalidIndex = -1;
 //  group.group_host_port()->add(host_port("test_fqdn", 34602));
 //  group.group_host_port()->add(host_port("test_fqdn", 34603));
 //
-class rpc_group_host_port : public ref_counter
+class rpc_group_host_port
 {
 public:
     rpc_group_host_port(const char *name);
@@ -170,12 +170,12 @@ inline void rpc_group_host_port::leader_forward()
 
 inline void rpc_group_host_port::set_leader(const host_port &hp)
 {
-    CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must 
be ipv4");
     awl_t l(_lock);
     if (hp.is_invalid()) {
         _leader_index = kInvalidIndex;
         return;
     }
+    CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must 
be ipv4");
     for (int i = 0; i < _members.size(); i++) {
         if (_members[i] == hp) {
             _leader_index = i;
diff --git a/src/runtime/rpc/rpc_host_port.cpp 
b/src/runtime/rpc/rpc_host_port.cpp
index fa4d36c9c..2da05e7b8 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -20,8 +20,8 @@
 #include <errno.h>
 #include <netdb.h>
 #include <netinet/in.h>
-#include <string.h>
 #include <sys/socket.h>
+#include <cstring>
 #include <memory>
 #include <unordered_set>
 #include <utility>
@@ -79,7 +79,7 @@ host_port::host_port(rpc_address addr)
         _port = addr.port();
     } break;
     case HOST_TYPE_GROUP: {
-        _group_host_port = new rpc_group_host_port(addr.group_address());
+        _group_host_port = 
std::make_shared<rpc_group_host_port>(addr.group_address());
     } break;
     default:
         break;
@@ -120,7 +120,7 @@ void host_port::reset()
         _port = 0;
         break;
     case HOST_TYPE_GROUP:
-        group_host_port()->release_ref();
+        _group_host_port = nullptr;
         break;
     default:
         break;
@@ -142,7 +142,6 @@ host_port &host_port::operator=(const host_port &other)
         break;
     case HOST_TYPE_GROUP:
         _group_host_port = other._group_host_port;
-        group_host_port()->add_ref();
         break;
     default:
         break;
@@ -157,9 +156,9 @@ std::string host_port::to_string() const
     case HOST_TYPE_IPV4:
         return fmt::format("{}:{}", _host, _port);
     case HOST_TYPE_GROUP:
-        return fmt::format("address group {}", group_host_port()->name());
+        return fmt::format("host_port group {}", group_host_port()->name());
     default:
-        return "invalid address";
+        return "invalid host_port";
     }
 }
 
@@ -167,9 +166,7 @@ void host_port::assign_group(const char *name)
 {
     reset();
     _type = HOST_TYPE_GROUP;
-    _group_host_port = new rpc_group_host_port(name);
-    // take the lifetime of rpc_uri_address, release_ref when change value or 
call destructor
-    _group_host_port->add_ref();
+    _group_host_port = std::make_shared<rpc_group_host_port>(name);
 }
 
 error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index 763c8ac91..4cda4180b 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -24,6 +24,7 @@
 // IWYU pragma: no_include <experimental/string_view>
 #include <functional>
 #include <iosfwd>
+#include <memory>
 #include <string>
 #include <string_view>
 #include <vector>
@@ -72,7 +73,7 @@ public:
         return os << hp.to_string();
     }
 
-    rpc_group_host_port *group_host_port() const
+    std::shared_ptr<rpc_group_host_port> group_host_port() const
     {
         CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!");
         return _group_host_port;
@@ -93,7 +94,7 @@ private:
     std::string _host;
     uint16_t _port = 0;
     dsn_host_type_t _type = HOST_TYPE_INVALID;
-    rpc_group_host_port *_group_host_port = nullptr;
+    std::shared_ptr<rpc_group_host_port> _group_host_port;
 };
 
 inline bool operator==(const host_port &hp1, const host_port &hp2)
@@ -118,6 +119,21 @@ inline bool operator==(const host_port &hp1, const 
host_port &hp2)
 
 inline bool operator!=(const host_port &hp1, const host_port &hp2) { return 
!(hp1 == hp2); }
 
+inline bool operator<(const host_port &hp1, const host_port &hp2)
+{
+    if (hp1.type() != hp2.type()) {
+        return hp1.type() < hp2.type();
+    }
+
+    switch (hp1.type()) {
+    case HOST_TYPE_IPV4:
+        return hp1.host() < hp2.host() || (hp1.host() == hp2.host() && 
hp1.port() < hp2.port());
+    case HOST_TYPE_GROUP:
+        return hp1.group_host_port().get() < hp2.group_host_port().get();
+    default:
+        return true;
+    }
+}
 } // namespace dsn
 
 USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port);
@@ -132,7 +148,7 @@ struct hash<::dsn::host_port>
         case HOST_TYPE_IPV4:
             return std::hash<std::string>()(hp.host()) ^ 
std::hash<uint16_t>()(hp.port());
         case HOST_TYPE_GROUP:
-            return std::hash<void *>()(hp.group_host_port());
+            return std::hash<void *>()(hp.group_host_port().get());
         default:
             return 0;
         }
diff --git a/src/runtime/test/host_port_test.cpp 
b/src/runtime/test/host_port_test.cpp
index 0cd656a07..05d6982b2 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-#include <string.h>
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -45,12 +45,12 @@ TEST(host_port_test, host_port_to_string)
 {
     {
         host_port hp = host_port("localhost", 8080);
-        ASSERT_EQ(std::string("localhost:8080"), hp.to_string());
+        ASSERT_EQ("localhost:8080", hp.to_string());
     }
 
     {
         host_port hp;
-        ASSERT_EQ(std::string("invalid address"), hp.to_string());
+        ASSERT_EQ("invalid host_port", hp.to_string());
     }
 }
 
@@ -100,6 +100,9 @@ TEST(host_port_test, operators)
     std::string hp_str2 = "pegasus:8080";
     ASSERT_FALSE(hp4.from_string(hp_str2));
     ASSERT_TRUE(hp4.is_invalid());
+
+    host_port hp5("localhost", 8081);
+    ASSERT_LT(hp, hp5);
 }
 
 TEST(host_port_test, rpc_group_host_port)
@@ -111,8 +114,8 @@ TEST(host_port_test, rpc_group_host_port)
     host_port hp_grp;
     hp_grp.assign_group("test_group");
     ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type());
-    rpc_group_host_port *g = hp_grp.group_host_port();
-    ASSERT_EQ(std::string("test_group"), g->name());
+    const auto &g = hp_grp.group_host_port();
+    ASSERT_STREQ("test_group", g->name());
 
     // invalid_hp
     ASSERT_FALSE(g->remove(hp));
@@ -166,6 +169,40 @@ TEST(host_port_test, rpc_group_host_port)
     ASSERT_FALSE(g->contains(hp2));
     ASSERT_EQ(0u, g->members().size());
     ASSERT_EQ(invalid_hp, g->leader());
+
+    // operator <
+    host_port hp_grp1;
+    hp_grp1.assign_group("test_group");
+    if (hp_grp.group_host_port().get() < hp_grp1.group_host_port().get()) {
+        ASSERT_LT(hp_grp, hp_grp1);
+    } else {
+        ASSERT_FALSE(hp_grp < hp_grp1);
+    }
+
+    // address_group -> host_port_group
+    rpc_address addr("127.0.0.1", 8080);
+    rpc_address addr2("127.0.0.1", 8081);
+
+    rpc_address addr_grp;
+    addr_grp.assign_group("test_group");
+    ASSERT_EQ(HOST_TYPE_GROUP, addr_grp.type());
+
+    auto g_addr = addr_grp.group_address();
+    ASSERT_STREQ("test_group", g_addr->name());
+
+    ASSERT_TRUE(g_addr->add(addr));
+    g_addr->set_leader(addr2);
+    ASSERT_EQ(addr2, g_addr->leader());
+    ASSERT_EQ(2, g_addr->count());
+
+    host_port hp_grp2;
+    hp_grp2 = host_port(addr_grp);
+    ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type());
+
+    auto g_hp = hp_grp2.group_host_port();
+    ASSERT_STREQ("test_group", g_hp->name());
+    ASSERT_EQ(hp2, g_hp->leader());
+    ASSERT_EQ(2, g_hp->count());
 }
 
 TEST(host_port_test, transfer_rpc_address)
@@ -204,27 +241,26 @@ TEST(host_port_test, dns_resolver)
     {
         host_port hp_grp;
         hp_grp.assign_group("test_group");
-        rpc_group_host_port *g = hp_grp.group_host_port();
+        auto g_hp = hp_grp.group_host_port();
 
         host_port hp1("localhost", 8080);
-        ASSERT_TRUE(g->add(hp1));
+        ASSERT_TRUE(g_hp->add(hp1));
         host_port hp2("localhost", 8081);
-        g->set_leader(hp2);
+        g_hp->set_leader(hp2);
 
         auto addr_grp = resolver.resolve_address(hp_grp);
+        auto g_addr = addr_grp.group_address();
 
-        ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(),
-                  hp_grp.group_host_port()->is_update_leader_automatically());
-        ASSERT_EQ(strcmp(addr_grp.group_address()->name(), 
hp_grp.group_host_port()->name()), 0);
-        ASSERT_EQ(addr_grp.group_address()->count(), 
hp_grp.group_host_port()->count());
-        ASSERT_EQ(host_port(addr_grp.group_address()->leader()),
-                  hp_grp.group_host_port()->leader());
+        ASSERT_EQ(g_addr->is_update_leader_automatically(), 
g_hp->is_update_leader_automatically());
+        ASSERT_STREQ(g_addr->name(), g_hp->name());
+        ASSERT_EQ(g_addr->count(), g_hp->count());
+        ASSERT_EQ(host_port(g_addr->leader()), g_hp->leader());
     }
 }
 
 void send_and_check_host_port_by_serialize(const host_port &hp, 
dsn_msg_serialize_format t)
 {
-    auto hp_str = hp.to_string();
+    const auto &hp_str = hp.to_string();
     ::dsn::rpc_address server("localhost", 20101);
 
     dsn::message_ptr msg_ptr = 
dsn::message_ex::create_request(RPC_TEST_THRIFT_HOST_PORT_PARSER);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to