This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new e655400af feat(FQDN): Implemention of struct host_port_group (#1436)
e655400af is described below
commit e655400afeae5cd3d267124d0a45f7a0b26d2d54
Author: liguohao <[email protected]>
AuthorDate: Thu Apr 27 22:07:36 2023 +0800
feat(FQDN): Implemention of struct host_port_group (#1436)
issue: https://github.com/apache/incubator-pegasus/issues/1404
Implement of struct `group_host_port` used by 'rpc_host_port'.
Also add some code on class `rpc_host_port` whose be related to
`group_host_port`
used.
---
src/runtime/rpc/group_host_port.h | 257 ++++++++++++++++++++++++++++++++++++
src/runtime/rpc/rpc_host_port.cpp | 25 +++-
src/runtime/rpc/rpc_host_port.h | 15 ++-
src/runtime/test/host_port_test.cpp | 71 +++++++++-
4 files changed, 360 insertions(+), 8 deletions(-)
diff --git a/src/runtime/rpc/group_host_port.h
b/src/runtime/rpc/group_host_port.h
new file mode 100644
index 000000000..cf7cab19b
--- /dev/null
+++ b/src/runtime/rpc/group_host_port.h
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "runtime/rpc/group_address.h"
+#include "runtime/rpc/group_host_port.h"
+#include "runtime/rpc/rpc_host_port.h"
+#include "utils/autoref_ptr.h"
+#include "utils/fmt_logging.h"
+#include "utils/rand.h"
+#include "utils/synchronize.h"
+
+namespace dsn {
+
+static constexpr int kInvalidIndex = -1;
+
+// Base on group_address, a group of host_post.
+// Please use host_port like example if you want call group of host_port.
+// e.g.
+//
+// dsn::rpc_host_port group;
+// group.assign_group("test");
+// group.group_host_port()->add(host_port("test_fqdn", 34601));
+// group.group_host_port()->add(host_port("test_fqdn", 34602));
+// group.group_host_port()->add(host_port("test_fqdn", 34603));
+//
+class rpc_group_host_port : public ref_counter
+{
+public:
+ rpc_group_host_port(const char *name);
+ rpc_group_host_port(const rpc_group_address *g_addr);
+ rpc_group_host_port(const rpc_group_host_port &other);
+ rpc_group_host_port &operator=(const rpc_group_host_port &other);
+ bool add(const host_port &hp) WARN_UNUSED_RESULT;
+ void add_list(const std::vector<host_port> &hps)
+ {
+ for (const auto &hp : hps) {
+ LOG_WARNING_IF(!add(hp), "duplicate adress {}", hp);
+ }
+ }
+ void set_leader(const host_port &hp);
+ bool remove(const host_port &hp) WARN_UNUSED_RESULT;
+ bool contains(const host_port &hp) const WARN_UNUSED_RESULT;
+ int count() const;
+
+ const std::vector<host_port> &members() const
+ {
+ arl_t l(_lock);
+ return _members;
+ }
+
+ uint32_t random_index_unlocked() const;
+ host_port random_member() const
+ {
+ arl_t l(_lock);
+ return _members.empty() ? host_port::s_invalid_host_port
+ : _members[random_index_unlocked()];
+ }
+ host_port next(const host_port ¤t) const;
+ host_port leader() const
+ {
+ arl_t l(_lock);
+ return _leader_index >= 0 ? _members[_leader_index] :
host_port::s_invalid_host_port;
+ }
+ void leader_forward();
+ // We should use 'possible_leader' for rpc group call, but not 'leader()'.
+ // Caz we not have leader sometimes in initialization phase.
+ host_port possible_leader();
+
+ // failure_detector should avoid failure detecting logic is affected by
rpc failure or rpc
+ // forwarding. So we need a switch to contronl update leader automatically.
+ bool is_update_leader_automatically() const { return
_update_leader_automatically; }
+ void set_update_leader_automatically(bool value) {
_update_leader_automatically = value; }
+ const char *name() const { return _name.c_str(); }
+
+private:
+ typedef std::vector<host_port> members_t;
+ typedef utils::auto_read_lock arl_t;
+ typedef utils::auto_write_lock awl_t;
+
+ mutable utils::rw_lock_nr _lock;
+ members_t _members;
+ // It's not always valid even if _members is not empty.
+ // Initialization is a possible value, which needs to be negotiated.
+ int _leader_index;
+ bool _update_leader_automatically;
+ std::string _name;
+};
+
+// ------------------ inline implementation --------------------
+
+inline rpc_group_host_port::rpc_group_host_port(const char *name)
+{
+ _name = name;
+ _leader_index = kInvalidIndex;
+ _update_leader_automatically = true;
+}
+
+inline rpc_group_host_port::rpc_group_host_port(const rpc_group_host_port
&other)
+{
+ _name = other._name;
+ _leader_index = other._leader_index;
+ _update_leader_automatically = other._update_leader_automatically;
+ _members = other._members;
+}
+
+inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address
*g_addr)
+{
+ _name = g_addr->name();
+ for (const auto &addr : g_addr->members()) {
+ CHECK_TRUE(add(host_port(addr)));
+ }
+ _update_leader_automatically = g_addr->is_update_leader_automatically();
+ set_leader(host_port(g_addr->leader()));
+}
+
+inline rpc_group_host_port &rpc_group_host_port::operator=(const
rpc_group_host_port &other)
+{
+ if (this == &other) {
+ return *this;
+ }
+ _name = other._name;
+ _leader_index = other._leader_index;
+ _update_leader_automatically = other._update_leader_automatically;
+ _members = other._members;
+ return *this;
+}
+
+inline bool rpc_group_host_port::add(const host_port &hp)
+{
+ CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must
be ipv4");
+
+ awl_t l(_lock);
+ if (_members.end() == std::find(_members.begin(), _members.end(), hp)) {
+ _members.push_back(hp);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+inline void rpc_group_host_port::leader_forward()
+{
+ awl_t l(_lock);
+ if (_members.empty()) {
+ return;
+ }
+ _leader_index = (_leader_index + 1) % _members.size();
+}
+
+inline void rpc_group_host_port::set_leader(const host_port &hp)
+{
+ CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must
be ipv4");
+ awl_t l(_lock);
+ if (hp.is_invalid()) {
+ _leader_index = kInvalidIndex;
+ return;
+ }
+ for (int i = 0; i < _members.size(); i++) {
+ if (_members[i] == hp) {
+ _leader_index = i;
+ return;
+ }
+ }
+
+ _members.push_back(hp);
+ _leader_index = static_cast<int>(_members.size() - 1);
+}
+
+inline uint32_t rpc_group_host_port::random_index_unlocked() const
+{
+ CHECK(!_members.empty(), "invaild group member size");
+ return rand::next_u32(0, static_cast<uint32_t>(_members.size() - 1));
+}
+
+inline host_port rpc_group_host_port::possible_leader()
+{
+ awl_t l(_lock);
+ if (_members.empty()) {
+ return host_port::s_invalid_host_port;
+ }
+ if (_leader_index == kInvalidIndex) {
+ _leader_index = random_index_unlocked();
+ }
+ return _members[_leader_index];
+}
+
+inline bool rpc_group_host_port::remove(const host_port &hp)
+{
+ awl_t l(_lock);
+ auto it = std::find(_members.begin(), _members.end(), hp);
+ if (it == _members.end()) {
+ return false;
+ }
+
+ if (kInvalidIndex != _leader_index && hp == _members[_leader_index]) {
+ _leader_index = kInvalidIndex;
+ }
+
+ _members.erase(it);
+
+ return true;
+}
+
+inline bool rpc_group_host_port::contains(const host_port &hp) const
+{
+ arl_t l(_lock);
+ return _members.end() != std::find(_members.begin(), _members.end(), hp);
+}
+
+inline int rpc_group_host_port::count() const
+{
+ arl_t l(_lock);
+ return _members.size();
+}
+
+inline host_port rpc_group_host_port::next(const host_port ¤t) const
+{
+ arl_t l(_lock);
+ if (_members.empty()) {
+ return host_port::s_invalid_host_port;
+ }
+
+ if (current.is_invalid()) {
+ return _members[random_index_unlocked()];
+ }
+
+ auto it = std::find(_members.begin(), _members.end(), current);
+ if (it == _members.end()) {
+ return _members[random_index_unlocked()];
+ }
+
+ it++;
+ return it == _members.end() ? _members[0] : *it;
+}
+
+} // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.cpp
b/src/runtime/rpc/rpc_host_port.cpp
index 9c71a7f26..97b75b6ea 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -21,6 +21,7 @@
#include <utility>
#include "fmt/core.h"
+#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/utils.h"
@@ -43,8 +44,9 @@ host_port::host_port(rpc_address addr)
addr.ipv4_str());
_port = addr.port();
} break;
- case HOST_TYPE_GROUP:
- CHECK(false, "type HOST_TYPE_GROUP not support!");
+ case HOST_TYPE_GROUP: {
+ _group_host_port = new rpc_group_host_port(addr.group_address());
+ } break;
default:
break;
}
@@ -59,7 +61,8 @@ void host_port::reset()
_port = 0;
break;
case HOST_TYPE_GROUP:
- CHECK(false, "type HOST_TYPE_GROUP not support!");
+ group_host_port()->release_ref();
+ break;
default:
break;
}
@@ -79,7 +82,9 @@ host_port &host_port::operator=(const host_port &other)
_port = other.port();
break;
case HOST_TYPE_GROUP:
- CHECK(false, "type HOST_TYPE_GROUP not support!");
+ _group_host_port = other._group_host_port;
+ group_host_port()->add_ref();
+ break;
default:
break;
}
@@ -93,9 +98,19 @@ std::string host_port::to_string() const
case HOST_TYPE_IPV4:
return fmt::format("{}:{}", _host, _port);
case HOST_TYPE_GROUP:
- CHECK(false, "type HOST_TYPE_GROUP not support!");
+ return fmt::format("address group {}", group_host_port()->name());
default:
return "invalid address";
}
}
+
+void host_port::assign_group(const char *name)
+{
+ reset();
+ _type = HOST_TYPE_GROUP;
+ _group_host_port = new rpc_group_host_port(name);
+ // take the lifetime of rpc_uri_address, release_ref when change value or
call destructor
+ _group_host_port->add_ref();
+}
+
} // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index b036e1d41..0a82afde0 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -27,10 +27,13 @@
#include <string>
#include "runtime/rpc/rpc_address.h"
+#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
namespace dsn {
+class rpc_group_host_port;
+
class host_port
{
public:
@@ -58,10 +61,18 @@ public:
return os << hp.to_string();
}
+ rpc_group_host_port *group_host_port() const
+ {
+ CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!");
+ return _group_host_port;
+ }
+ void assign_group(const char *name);
+
private:
std::string _host;
uint16_t _port = 0;
dsn_host_type_t _type = HOST_TYPE_INVALID;
+ ref_ptr<rpc_group_host_port> _group_host_port;
};
inline bool operator==(const host_port &hp1, const host_port &hp2)
@@ -78,7 +89,7 @@ inline bool operator==(const host_port &hp1, const host_port
&hp2)
case HOST_TYPE_IPV4:
return hp1.host() == hp2.host() && hp1.port() == hp2.port();
case HOST_TYPE_GROUP:
- CHECK(false, "type HOST_TYPE_GROUP not support!");
+ return hp1.group_host_port() == hp2.group_host_port();
default:
return true;
}
@@ -98,7 +109,7 @@ struct hash<::dsn::host_port>
case HOST_TYPE_IPV4:
return std::hash<std::string>()(hp.host()) ^
std::hash<uint16_t>()(hp.port());
case HOST_TYPE_GROUP:
- CHECK(false, "type HOST_TYPE_GROUP not support!");
+ return std::hash<void *>()(hp.group_host_port());
default:
return 0;
}
diff --git a/src/runtime/test/host_port_test.cpp
b/src/runtime/test/host_port_test.cpp
index e1adc74de..dfa48d2dd 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -17,11 +17,13 @@
* under the License.
*/
-#include <gtest/gtest.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
#include <string>
+#include <vector>
+#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
@@ -74,4 +76,71 @@ TEST(host_port_test, operators)
ASSERT_FALSE(hp.is_invalid());
ASSERT_TRUE(hp2.is_invalid());
}
+
+TEST(host_port_test, rpc_group_host_port)
+{
+ host_port hp("localhost", 8080);
+ host_port hp2("localhost", 8081);
+ host_port invalid_hp;
+
+ host_port hp_grp;
+ hp_grp.assign_group("test_group");
+ ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type());
+ rpc_group_host_port *g = hp_grp.group_host_port();
+ ASSERT_EQ(std::string("test_group"), g->name());
+
+ // invalid_hp
+ ASSERT_FALSE(g->remove(hp));
+ ASSERT_FALSE(g->contains(hp));
+ ASSERT_EQ(0u, g->members().size());
+ ASSERT_EQ(invalid_hp, g->random_member());
+ ASSERT_EQ(invalid_hp, g->next(hp));
+ ASSERT_EQ(invalid_hp, g->leader());
+ ASSERT_EQ(invalid_hp, g->possible_leader());
+
+ // hp
+ g->set_leader(hp);
+ ASSERT_TRUE(g->contains(hp));
+ ASSERT_EQ(1u, g->members().size());
+ ASSERT_EQ(hp, g->members().at(0));
+ ASSERT_EQ(hp, g->leader());
+ ASSERT_EQ(hp, g->possible_leader());
+
+ // hp2
+ g->set_leader(hp2);
+ ASSERT_TRUE(g->contains(hp));
+ ASSERT_TRUE(g->contains(hp2));
+ ASSERT_EQ(2u, g->members().size());
+ ASSERT_EQ(hp, g->members().at(0));
+ ASSERT_EQ(hp2, g->members().at(1));
+ ASSERT_EQ(hp2, g->leader());
+ ASSERT_EQ(hp2, g->possible_leader());
+ ASSERT_EQ(hp, g->next(hp2));
+ ASSERT_EQ(hp2, g->next(hp));
+
+ // change leader
+ g->set_leader(hp);
+ ASSERT_TRUE(g->contains(hp));
+ ASSERT_TRUE(g->contains(hp2));
+ ASSERT_EQ(2u, g->members().size());
+ ASSERT_EQ(hp, g->members().at(0));
+ ASSERT_EQ(hp2, g->members().at(1));
+ ASSERT_EQ(hp, g->leader());
+ g->leader_forward();
+ ASSERT_EQ(hp2, g->leader());
+
+ // del
+ ASSERT_TRUE(g->remove(hp));
+ ASSERT_FALSE(g->contains(hp));
+ ASSERT_TRUE(g->contains(hp2));
+ ASSERT_EQ(1u, g->members().size());
+ ASSERT_EQ(hp2, g->members().at(0));
+ ASSERT_EQ(invalid_hp, g->leader());
+
+ ASSERT_TRUE(g->remove(hp2));
+ ASSERT_FALSE(g->contains(hp2));
+ ASSERT_EQ(0u, g->members().size());
+ ASSERT_EQ(invalid_hp, g->leader());
+}
+
} // namespace dsn
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]