This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e655400af feat(FQDN): Implemention of struct host_port_group (#1436)
e655400af is described below

commit e655400afeae5cd3d267124d0a45f7a0b26d2d54
Author: liguohao <[email protected]>
AuthorDate: Thu Apr 27 22:07:36 2023 +0800

    feat(FQDN): Implemention of struct host_port_group (#1436)
    
    issue: https://github.com/apache/incubator-pegasus/issues/1404
    
    Implement of struct `group_host_port` used by 'rpc_host_port'.
    Also add some code on class `rpc_host_port` whose be related to 
`group_host_port`
    used.
---
 src/runtime/rpc/group_host_port.h   | 257 ++++++++++++++++++++++++++++++++++++
 src/runtime/rpc/rpc_host_port.cpp   |  25 +++-
 src/runtime/rpc/rpc_host_port.h     |  15 ++-
 src/runtime/test/host_port_test.cpp |  71 +++++++++-
 4 files changed, 360 insertions(+), 8 deletions(-)

diff --git a/src/runtime/rpc/group_host_port.h 
b/src/runtime/rpc/group_host_port.h
new file mode 100644
index 000000000..cf7cab19b
--- /dev/null
+++ b/src/runtime/rpc/group_host_port.h
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "runtime/rpc/group_address.h"
+#include "runtime/rpc/group_host_port.h"
+#include "runtime/rpc/rpc_host_port.h"
+#include "utils/autoref_ptr.h"
+#include "utils/fmt_logging.h"
+#include "utils/rand.h"
+#include "utils/synchronize.h"
+
+namespace dsn {
+
+static constexpr int kInvalidIndex = -1;
+
+// Base on group_address, a group of host_post.
+// Please use host_port like example if you want call group of host_port.
+//  e.g.
+//
+//  dsn::rpc_host_port group;
+//  group.assign_group("test");
+//  group.group_host_port()->add(host_port("test_fqdn", 34601));
+//  group.group_host_port()->add(host_port("test_fqdn", 34602));
+//  group.group_host_port()->add(host_port("test_fqdn", 34603));
+//
+class rpc_group_host_port : public ref_counter
+{
+public:
+    rpc_group_host_port(const char *name);
+    rpc_group_host_port(const rpc_group_address *g_addr);
+    rpc_group_host_port(const rpc_group_host_port &other);
+    rpc_group_host_port &operator=(const rpc_group_host_port &other);
+    bool add(const host_port &hp) WARN_UNUSED_RESULT;
+    void add_list(const std::vector<host_port> &hps)
+    {
+        for (const auto &hp : hps) {
+            LOG_WARNING_IF(!add(hp), "duplicate adress {}", hp);
+        }
+    }
+    void set_leader(const host_port &hp);
+    bool remove(const host_port &hp) WARN_UNUSED_RESULT;
+    bool contains(const host_port &hp) const WARN_UNUSED_RESULT;
+    int count() const;
+
+    const std::vector<host_port> &members() const
+    {
+        arl_t l(_lock);
+        return _members;
+    }
+
+    uint32_t random_index_unlocked() const;
+    host_port random_member() const
+    {
+        arl_t l(_lock);
+        return _members.empty() ? host_port::s_invalid_host_port
+                                : _members[random_index_unlocked()];
+    }
+    host_port next(const host_port &current) const;
+    host_port leader() const
+    {
+        arl_t l(_lock);
+        return _leader_index >= 0 ? _members[_leader_index] : 
host_port::s_invalid_host_port;
+    }
+    void leader_forward();
+    // We should use 'possible_leader' for rpc group call, but not 'leader()'.
+    // Caz we not have leader sometimes in initialization phase.
+    host_port possible_leader();
+
+    // failure_detector should avoid failure detecting logic is affected by 
rpc failure or rpc
+    // forwarding. So we need a switch to contronl update leader automatically.
+    bool is_update_leader_automatically() const { return 
_update_leader_automatically; }
+    void set_update_leader_automatically(bool value) { 
_update_leader_automatically = value; }
+    const char *name() const { return _name.c_str(); }
+
+private:
+    typedef std::vector<host_port> members_t;
+    typedef utils::auto_read_lock arl_t;
+    typedef utils::auto_write_lock awl_t;
+
+    mutable utils::rw_lock_nr _lock;
+    members_t _members;
+    // It's not always valid even if _members is not empty.
+    // Initialization is a possible value, which needs to be negotiated.
+    int _leader_index;
+    bool _update_leader_automatically;
+    std::string _name;
+};
+
+// ------------------ inline implementation --------------------
+
+inline rpc_group_host_port::rpc_group_host_port(const char *name)
+{
+    _name = name;
+    _leader_index = kInvalidIndex;
+    _update_leader_automatically = true;
+}
+
+inline rpc_group_host_port::rpc_group_host_port(const rpc_group_host_port 
&other)
+{
+    _name = other._name;
+    _leader_index = other._leader_index;
+    _update_leader_automatically = other._update_leader_automatically;
+    _members = other._members;
+}
+
+inline rpc_group_host_port::rpc_group_host_port(const rpc_group_address 
*g_addr)
+{
+    _name = g_addr->name();
+    for (const auto &addr : g_addr->members()) {
+        CHECK_TRUE(add(host_port(addr)));
+    }
+    _update_leader_automatically = g_addr->is_update_leader_automatically();
+    set_leader(host_port(g_addr->leader()));
+}
+
+inline rpc_group_host_port &rpc_group_host_port::operator=(const 
rpc_group_host_port &other)
+{
+    if (this == &other) {
+        return *this;
+    }
+    _name = other._name;
+    _leader_index = other._leader_index;
+    _update_leader_automatically = other._update_leader_automatically;
+    _members = other._members;
+    return *this;
+}
+
+inline bool rpc_group_host_port::add(const host_port &hp)
+{
+    CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must 
be ipv4");
+
+    awl_t l(_lock);
+    if (_members.end() == std::find(_members.begin(), _members.end(), hp)) {
+        _members.push_back(hp);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+inline void rpc_group_host_port::leader_forward()
+{
+    awl_t l(_lock);
+    if (_members.empty()) {
+        return;
+    }
+    _leader_index = (_leader_index + 1) % _members.size();
+}
+
+inline void rpc_group_host_port::set_leader(const host_port &hp)
+{
+    CHECK_EQ_MSG(hp.type(), HOST_TYPE_IPV4, "rpc group host_port member must 
be ipv4");
+    awl_t l(_lock);
+    if (hp.is_invalid()) {
+        _leader_index = kInvalidIndex;
+        return;
+    }
+    for (int i = 0; i < _members.size(); i++) {
+        if (_members[i] == hp) {
+            _leader_index = i;
+            return;
+        }
+    }
+
+    _members.push_back(hp);
+    _leader_index = static_cast<int>(_members.size() - 1);
+}
+
+inline uint32_t rpc_group_host_port::random_index_unlocked() const
+{
+    CHECK(!_members.empty(), "invaild group member size");
+    return rand::next_u32(0, static_cast<uint32_t>(_members.size() - 1));
+}
+
+inline host_port rpc_group_host_port::possible_leader()
+{
+    awl_t l(_lock);
+    if (_members.empty()) {
+        return host_port::s_invalid_host_port;
+    }
+    if (_leader_index == kInvalidIndex) {
+        _leader_index = random_index_unlocked();
+    }
+    return _members[_leader_index];
+}
+
+inline bool rpc_group_host_port::remove(const host_port &hp)
+{
+    awl_t l(_lock);
+    auto it = std::find(_members.begin(), _members.end(), hp);
+    if (it == _members.end()) {
+        return false;
+    }
+
+    if (kInvalidIndex != _leader_index && hp == _members[_leader_index]) {
+        _leader_index = kInvalidIndex;
+    }
+
+    _members.erase(it);
+
+    return true;
+}
+
+inline bool rpc_group_host_port::contains(const host_port &hp) const
+{
+    arl_t l(_lock);
+    return _members.end() != std::find(_members.begin(), _members.end(), hp);
+}
+
+inline int rpc_group_host_port::count() const
+{
+    arl_t l(_lock);
+    return _members.size();
+}
+
+inline host_port rpc_group_host_port::next(const host_port &current) const
+{
+    arl_t l(_lock);
+    if (_members.empty()) {
+        return host_port::s_invalid_host_port;
+    }
+
+    if (current.is_invalid()) {
+        return _members[random_index_unlocked()];
+    }
+
+    auto it = std::find(_members.begin(), _members.end(), current);
+    if (it == _members.end()) {
+        return _members[random_index_unlocked()];
+    }
+
+    it++;
+    return it == _members.end() ? _members[0] : *it;
+}
+
+} // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.cpp 
b/src/runtime/rpc/rpc_host_port.cpp
index 9c71a7f26..97b75b6ea 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -21,6 +21,7 @@
 #include <utility>
 
 #include "fmt/core.h"
+#include "runtime/rpc/group_host_port.h"
 #include "runtime/rpc/rpc_host_port.h"
 #include "utils/utils.h"
 
@@ -43,8 +44,9 @@ host_port::host_port(rpc_address addr)
               addr.ipv4_str());
         _port = addr.port();
     } break;
-    case HOST_TYPE_GROUP:
-        CHECK(false, "type HOST_TYPE_GROUP not support!");
+    case HOST_TYPE_GROUP: {
+        _group_host_port = new rpc_group_host_port(addr.group_address());
+    } break;
     default:
         break;
     }
@@ -59,7 +61,8 @@ void host_port::reset()
         _port = 0;
         break;
     case HOST_TYPE_GROUP:
-        CHECK(false, "type HOST_TYPE_GROUP not support!");
+        group_host_port()->release_ref();
+        break;
     default:
         break;
     }
@@ -79,7 +82,9 @@ host_port &host_port::operator=(const host_port &other)
         _port = other.port();
         break;
     case HOST_TYPE_GROUP:
-        CHECK(false, "type HOST_TYPE_GROUP not support!");
+        _group_host_port = other._group_host_port;
+        group_host_port()->add_ref();
+        break;
     default:
         break;
     }
@@ -93,9 +98,19 @@ std::string host_port::to_string() const
     case HOST_TYPE_IPV4:
         return fmt::format("{}:{}", _host, _port);
     case HOST_TYPE_GROUP:
-        CHECK(false, "type HOST_TYPE_GROUP not support!");
+        return fmt::format("address group {}", group_host_port()->name());
     default:
         return "invalid address";
     }
 }
+
+void host_port::assign_group(const char *name)
+{
+    reset();
+    _type = HOST_TYPE_GROUP;
+    _group_host_port = new rpc_group_host_port(name);
+    // take the lifetime of rpc_uri_address, release_ref when change value or 
call destructor
+    _group_host_port->add_ref();
+}
+
 } // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index b036e1d41..0a82afde0 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -27,10 +27,13 @@
 #include <string>
 
 #include "runtime/rpc/rpc_address.h"
+#include "utils/autoref_ptr.h"
 #include "utils/fmt_logging.h"
 
 namespace dsn {
 
+class rpc_group_host_port;
+
 class host_port
 {
 public:
@@ -58,10 +61,18 @@ public:
         return os << hp.to_string();
     }
 
+    rpc_group_host_port *group_host_port() const
+    {
+        CHECK_NOTNULL(_group_host_port, "group_host_port cannot be null!");
+        return _group_host_port;
+    }
+    void assign_group(const char *name);
+
 private:
     std::string _host;
     uint16_t _port = 0;
     dsn_host_type_t _type = HOST_TYPE_INVALID;
+    ref_ptr<rpc_group_host_port> _group_host_port;
 };
 
 inline bool operator==(const host_port &hp1, const host_port &hp2)
@@ -78,7 +89,7 @@ inline bool operator==(const host_port &hp1, const host_port 
&hp2)
     case HOST_TYPE_IPV4:
         return hp1.host() == hp2.host() && hp1.port() == hp2.port();
     case HOST_TYPE_GROUP:
-        CHECK(false, "type HOST_TYPE_GROUP not support!");
+        return hp1.group_host_port() == hp2.group_host_port();
     default:
         return true;
     }
@@ -98,7 +109,7 @@ struct hash<::dsn::host_port>
         case HOST_TYPE_IPV4:
             return std::hash<std::string>()(hp.host()) ^ 
std::hash<uint16_t>()(hp.port());
         case HOST_TYPE_GROUP:
-            CHECK(false, "type HOST_TYPE_GROUP not support!");
+            return std::hash<void *>()(hp.group_host_port());
         default:
             return 0;
         }
diff --git a/src/runtime/test/host_port_test.cpp 
b/src/runtime/test/host_port_test.cpp
index e1adc74de..dfa48d2dd 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -17,11 +17,13 @@
  * under the License.
  */
 
-#include <gtest/gtest.h>
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
 #include <string>
+#include <vector>
 
+#include "runtime/rpc/group_host_port.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_host_port.h"
 
@@ -74,4 +76,71 @@ TEST(host_port_test, operators)
     ASSERT_FALSE(hp.is_invalid());
     ASSERT_TRUE(hp2.is_invalid());
 }
+
+TEST(host_port_test, rpc_group_host_port)
+{
+    host_port hp("localhost", 8080);
+    host_port hp2("localhost", 8081);
+    host_port invalid_hp;
+
+    host_port hp_grp;
+    hp_grp.assign_group("test_group");
+    ASSERT_EQ(HOST_TYPE_GROUP, hp_grp.type());
+    rpc_group_host_port *g = hp_grp.group_host_port();
+    ASSERT_EQ(std::string("test_group"), g->name());
+
+    // invalid_hp
+    ASSERT_FALSE(g->remove(hp));
+    ASSERT_FALSE(g->contains(hp));
+    ASSERT_EQ(0u, g->members().size());
+    ASSERT_EQ(invalid_hp, g->random_member());
+    ASSERT_EQ(invalid_hp, g->next(hp));
+    ASSERT_EQ(invalid_hp, g->leader());
+    ASSERT_EQ(invalid_hp, g->possible_leader());
+
+    // hp
+    g->set_leader(hp);
+    ASSERT_TRUE(g->contains(hp));
+    ASSERT_EQ(1u, g->members().size());
+    ASSERT_EQ(hp, g->members().at(0));
+    ASSERT_EQ(hp, g->leader());
+    ASSERT_EQ(hp, g->possible_leader());
+
+    // hp2
+    g->set_leader(hp2);
+    ASSERT_TRUE(g->contains(hp));
+    ASSERT_TRUE(g->contains(hp2));
+    ASSERT_EQ(2u, g->members().size());
+    ASSERT_EQ(hp, g->members().at(0));
+    ASSERT_EQ(hp2, g->members().at(1));
+    ASSERT_EQ(hp2, g->leader());
+    ASSERT_EQ(hp2, g->possible_leader());
+    ASSERT_EQ(hp, g->next(hp2));
+    ASSERT_EQ(hp2, g->next(hp));
+
+    // change leader
+    g->set_leader(hp);
+    ASSERT_TRUE(g->contains(hp));
+    ASSERT_TRUE(g->contains(hp2));
+    ASSERT_EQ(2u, g->members().size());
+    ASSERT_EQ(hp, g->members().at(0));
+    ASSERT_EQ(hp2, g->members().at(1));
+    ASSERT_EQ(hp, g->leader());
+    g->leader_forward();
+    ASSERT_EQ(hp2, g->leader());
+
+    // del
+    ASSERT_TRUE(g->remove(hp));
+    ASSERT_FALSE(g->contains(hp));
+    ASSERT_TRUE(g->contains(hp2));
+    ASSERT_EQ(1u, g->members().size());
+    ASSERT_EQ(hp2, g->members().at(0));
+    ASSERT_EQ(invalid_hp, g->leader());
+
+    ASSERT_TRUE(g->remove(hp2));
+    ASSERT_FALSE(g->contains(hp2));
+    ASSERT_EQ(0u, g->members().size());
+    ASSERT_EQ(invalid_hp, g->leader());
+}
+
 } // namespace dsn


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to