This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a461fa511 feat(fqdn): add operator< and use shared_ptr for
group_host_port() instead of raw pointer for host_port (#1825)
a461fa511 is described below
commit a461fa511a409863c592a9f624c553c1485c6404
Author: Guohao Li <[email protected]>
AuthorDate: Thu Jan 11 14:26:59 2024 +0800
feat(fqdn): add operator< and use shared_ptr for group_host_port() instead
of raw pointer for host_port (#1825)
https://github.com/apache/incubator-pegasus/issues/1824
---
src/runtime/rpc/dns_resolver.cpp | 1 +
src/runtime/rpc/group_host_port.h | 4 +--
src/runtime/rpc/rpc_host_port.cpp | 15 ++++-----
src/runtime/rpc/rpc_host_port.h | 22 +++++++++++--
src/runtime/test/host_port_test.cpp | 66 ++++++++++++++++++++++++++++---------
5 files changed, 79 insertions(+), 29 deletions(-)
diff --git a/src/runtime/rpc/dns_resolver.cpp b/src/runtime/rpc/dns_resolver.cpp
index 0cc9a0616..8ccbdac56 100644
--- a/src/runtime/rpc/dns_resolver.cpp
+++ b/src/runtime/rpc/dns_resolver.cpp
@@ -18,6 +18,7 @@
*/
#include <algorithm>
+#include <memory>
#include <utility>
#include "fmt/format.h"
diff --git a/src/runtime/rpc/group_host_port.h
b/src/runtime/rpc/group_host_port.h
index cf7cab19b..1bc8989ea 100644
--- a/src/runtime/rpc/group_host_port.h
+++ b/src/runtime/rpc/group_host_port.h
@@ -44,7 +44,7 @@ static constexpr int kInvalidIndex = -1;
// group.group_host_port()->add(host_port("test_fqdn", 34602));
// group.group_host_port()->add(host_port("test_fqdn", 34603));
//
-class rpc_group_host_port : public ref_counter
+class rpc_group_host_port
{
public:
rpc_group_host_port(const char *name);
@@ -170,12 +170,12 @@ inline void rpc_group_host_port::leader_forward()
inline void rpc_group_host_port::set_leader(const host_port &hp)
{
- CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must
be ipv4");
awl_t l(_lock);
if (hp.is_invalid()) {
_leader_index = kInvalidIndex;
return;
}
+ CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must
be ipv4");
for (int i = 0; i < _members.size(); i++) {
if (_members[i] == hp) {
_leader_index = i;
diff --git a/src/runtime/rpc/rpc_host_port.cpp
b/src/runtime/rpc/rpc_host_port.cpp
index fa4d36c9c..2da05e7b8 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -20,8 +20,8 @@
#include <errno.h>
#include <netdb.h>
#include <netinet/in.h>
-#include <string.h>
#include <sys/socket.h>
+#include <cstring>
#include <memory>
#include <unordered_set>
#include <utility>
@@ -79,7 +79,7 @@ host_port::host_port(rpc_address addr)
_port = addr.port();
} break;
case HOST_TYPE_GROUP: {
- _group_host_port = new rpc_group_host_port(addr.group_address());
+ _group_host_port =
std::make_shared<rpc_group_host_port>(addr.group_address());
} break;
default:
break;
@@ -120,7 +120,7 @@ void host_port::reset()
_port = 0;
break;
case HOST_TYPE_GROUP:
- group_host_port()->release_ref();
+ _group_host_port = nullptr;
break;
default:
break;
@@ -142,7 +142,6 @@ host_port &host_port::operator=(const host_port &other)
break;
case HOST_TYPE_GROUP:
_group_host_port = other._group_host_port;
- group_host_port()->add_ref();
break;
default:
break;
@@ -157,9 +156,9 @@ std::string host_port::to_string() const
case HOST_TYPE_IPV4:
return fmt::format("{}:{}", _host, _port);
case HOST_TYPE_GROUP:
- return fmt::format("address group {}", group_host_port()->name());
+ return fmt::format("host_port group {}", group_host_port()->name());
default:
- return "invalid address";
+ return "invalid host_port";
}
}
@@ -167,9 +166,7 @@ void host_port::assign_group(const char *name)
{
reset();
_type = HOST_TYPE_GROUP;
- _group_host_port = new rpc_group_host_port(name);
- // take the lifetime of rpc_uri_address, release_ref when change value or
call destructor
- _group_host_port->add_ref();
+ _group_host_port = std::make_shared<rpc_group_host_port>(name);
}
error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index 763c8ac91..4cda4180b 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -24,6 +24,7 @@
// IWYU pragma: no_include <experimental/string_view>
#include <functional>
#include <iosfwd>
+#include <memory>
#include <string>
#include <string_view>
#include <vector>
@@ -72,7 +73,7 @@ public:
return os << hp.to_string();
}
- rpc_group_host_port *group_host_port() const
+ std::shared_ptr<rpc_group_host_port> group_host_port() const
{
CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!");
return _group_host_port;
@@ -93,7 +94,7 @@ private:
std::string _host;
uint16_t _port = 0;
dsn_host_type_t _type = HOST_TYPE_INVALID;
- rpc_group_host_port *_group_host_port = nullptr;
+ std::shared_ptr<rpc_group_host_port> _group_host_port;
};
inline bool operator==(const host_port &hp1, const host_port &hp2)
@@ -118,6 +119,21 @@ inline bool operator==(const host_port &hp1, const
host_port &hp2)
inline bool operator!=(const host_port &hp1, const host_port &hp2) { return
!(hp1 == hp2); }
+inline bool operator<(const host_port &hp1, const host_port &hp2)
+{
+ if (hp1.type() != hp2.type()) {
+ return hp1.type() < hp2.type();
+ }
+
+ switch (hp1.type()) {
+ case HOST_TYPE_IPV4:
+ return hp1.host() < hp2.host() || (hp1.host() == hp2.host() &&
hp1.port() < hp2.port());
+ case HOST_TYPE_GROUP:
+ return hp1.group_host_port().get() < hp2.group_host_port().get();
+ default:
+ return true;
+ }
+}
} // namespace dsn
USER_DEFINED_STRUCTURE_FORMATTER(::dsn::host_port);
@@ -132,7 +148,7 @@ struct hash<::dsn::host_port>
case HOST_TYPE_IPV4:
return std::hash<std::string>()(hp.host()) ^
std::hash<uint16_t>()(hp.port());
case HOST_TYPE_GROUP:
- return std::hash<void *>()(hp.group_host_port());
+ return std::hash<void *>()(hp.group_host_port().get());
default:
return 0;
}
diff --git a/src/runtime/test/host_port_test.cpp
b/src/runtime/test/host_port_test.cpp
index 0cd656a07..05d6982b2 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -17,7 +17,7 @@
* under the License.
*/
-#include <string.h>
+#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -45,12 +45,12 @@ TEST(host_port_test, host_port_to_string)
{
{
host_port hp = host_port("localhost", 8080);
- ASSERT_EQ(std::string("localhost:8080"), hp.to_string());
+ ASSERT_EQ("localhost:8080", hp.to_string());
}
{
host_port hp;
- ASSERT_EQ(std::string("invalid address"), hp.to_string());
+ ASSERT_EQ("invalid host_port", hp.to_string());
}
}
@@ -100,6 +100,9 @@ TEST(host_port_test, operators)
std::string hp_str2 = "pegasus:8080";
ASSERT_FALSE(hp4.from_string(hp_str2));
ASSERT_TRUE(hp4.is_invalid());
+
+ host_port hp5("localhost", 8081);
+ ASSERT_LT(hp, hp5);
}
TEST(host_port_test, rpc_group_host_port)
@@ -111,8 +114,8 @@ TEST(host_port_test, rpc_group_host_port)
host_port hp_grp;
hp_grp.assign_group("test_group");
ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type());
- rpc_group_host_port *g = hp_grp.group_host_port();
- ASSERT_EQ(std::string("test_group"), g->name());
+ const auto &g = hp_grp.group_host_port();
+ ASSERT_STREQ("test_group", g->name());
// invalid_hp
ASSERT_FALSE(g->remove(hp));
@@ -166,6 +169,40 @@ TEST(host_port_test, rpc_group_host_port)
ASSERT_FALSE(g->contains(hp2));
ASSERT_EQ(0u, g->members().size());
ASSERT_EQ(invalid_hp, g->leader());
+
+ // operator <
+ host_port hp_grp1;
+ hp_grp1.assign_group("test_group");
+ if (hp_grp.group_host_port().get() < hp_grp1.group_host_port().get()) {
+ ASSERT_LT(hp_grp, hp_grp1);
+ } else {
+ ASSERT_FALSE(hp_grp < hp_grp1);
+ }
+
+ // address_group -> host_port_group
+ rpc_address addr("127.0.0.1", 8080);
+ rpc_address addr2("127.0.0.1", 8081);
+
+ rpc_address addr_grp;
+ addr_grp.assign_group("test_group");
+ ASSERT_EQ(HOST_TYPE_GROUP, addr_grp.type());
+
+ auto g_addr = addr_grp.group_address();
+ ASSERT_STREQ("test_group", g_addr->name());
+
+ ASSERT_TRUE(g_addr->add(addr));
+ g_addr->set_leader(addr2);
+ ASSERT_EQ(addr2, g_addr->leader());
+ ASSERT_EQ(2, g_addr->count());
+
+ host_port hp_grp2;
+ hp_grp2 = host_port(addr_grp);
+ ASSERT_EQ(HOST_TYPE_GROUP, hp_grp2.type());
+
+ auto g_hp = hp_grp2.group_host_port();
+ ASSERT_STREQ("test_group", g_hp->name());
+ ASSERT_EQ(hp2, g_hp->leader());
+ ASSERT_EQ(2, g_hp->count());
}
TEST(host_port_test, transfer_rpc_address)
@@ -204,27 +241,26 @@ TEST(host_port_test, dns_resolver)
{
host_port hp_grp;
hp_grp.assign_group("test_group");
- rpc_group_host_port *g = hp_grp.group_host_port();
+ auto g_hp = hp_grp.group_host_port();
host_port hp1("localhost", 8080);
- ASSERT_TRUE(g->add(hp1));
+ ASSERT_TRUE(g_hp->add(hp1));
host_port hp2("localhost", 8081);
- g->set_leader(hp2);
+ g_hp->set_leader(hp2);
auto addr_grp = resolver.resolve_address(hp_grp);
+ auto g_addr = addr_grp.group_address();
- ASSERT_EQ(addr_grp.group_address()->is_update_leader_automatically(),
- hp_grp.group_host_port()->is_update_leader_automatically());
- ASSERT_EQ(strcmp(addr_grp.group_address()->name(),
hp_grp.group_host_port()->name()), 0);
- ASSERT_EQ(addr_grp.group_address()->count(),
hp_grp.group_host_port()->count());
- ASSERT_EQ(host_port(addr_grp.group_address()->leader()),
- hp_grp.group_host_port()->leader());
+ ASSERT_EQ(g_addr->is_update_leader_automatically(),
g_hp->is_update_leader_automatically());
+ ASSERT_STREQ(g_addr->name(), g_hp->name());
+ ASSERT_EQ(g_addr->count(), g_hp->count());
+ ASSERT_EQ(host_port(g_addr->leader()), g_hp->leader());
}
}
void send_and_check_host_port_by_serialize(const host_port &hp,
dsn_msg_serialize_format t)
{
- auto hp_str = hp.to_string();
+ const auto &hp_str = hp.to_string();
::dsn::rpc_address server("localhost", 20101);
dsn::message_ptr msg_ptr =
dsn::message_ex::create_request(RPC_TEST_THRIFT_HOST_PORT_PARSER);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]