This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 011dabe9 Make sure WeightedRandomizedLoadBalancer can traverse the 
whole server list (#2953)
011dabe9 is described below

commit 011dabe9c896705fd539df6467ebab1c3f02dcf1
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Thu May 8 17:56:00 2025 +0800

    Make sure WeightedRandomizedLoadBalancer can traverse the whole server list 
(#2953)
    
    * Make sure WeightedRandomizedLoadBalancer can traverse the whole server 
list
    
    * Only mark unavailable server as traversed
---
 src/brpc/load_balancer.h                           |  7 +++
 src/brpc/policy/dynpart_load_balancer.h            | 16 +++---
 src/brpc/policy/locality_aware_load_balancer.h     | 20 ++++----
 src/brpc/policy/randomized_load_balancer.cpp       |  8 ---
 src/brpc/policy/randomized_load_balancer.h         | 16 +++---
 src/brpc/policy/round_robin_load_balancer.cpp      |  8 ---
 src/brpc/policy/round_robin_load_balancer.h        | 16 +++---
 .../policy/weighted_randomized_load_balancer.cpp   | 57 +++++++++++++++++-----
 .../policy/weighted_randomized_load_balancer.h     | 20 ++++----
 .../policy/weighted_round_robin_load_balancer.cpp  |  6 +--
 .../policy/weighted_round_robin_load_balancer.h    | 16 +++---
 test/bthread_countdown_event_unittest.cpp          |  5 ++
 12 files changed, 113 insertions(+), 82 deletions(-)

diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h
index cda0517e..a32b298d 100644
--- a/src/brpc/load_balancer.h
+++ b/src/brpc/load_balancer.h
@@ -184,6 +184,13 @@ inline Extension<const LoadBalancer>* 
LoadBalancerExtension() {
     return Extension<const LoadBalancer>::instance();
 }
 
+inline uint32_t GenRandomStride() {
+    uint32_t prime_offset[] = {
+        #include "bthread/offset_inl.list"
+    };
+    return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
+}
+
 } // namespace brpc
 
 
diff --git a/src/brpc/policy/dynpart_load_balancer.h 
b/src/brpc/policy/dynpart_load_balancer.h
index d3fd9d68..4e8833bf 100644
--- a/src/brpc/policy/dynpart_load_balancer.h
+++ b/src/brpc/policy/dynpart_load_balancer.h
@@ -33,14 +33,14 @@ namespace policy {
 
 class DynPartLoadBalancer : public LoadBalancer {
 public:
-    bool AddServer(const ServerId& id);
-    bool RemoveServer(const ServerId& id);
-    size_t AddServersInBatch(const std::vector<ServerId>& servers);
-    size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
-    int SelectServer(const SelectIn& in, SelectOut* out);
-    DynPartLoadBalancer* New(const butil::StringPiece&) const;
-    void Destroy();
-    void Describe(std::ostream&, const DescribeOptions& options);
+    bool AddServer(const ServerId& id) override;
+    bool RemoveServer(const ServerId& id) override;
+    size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+    size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+    int SelectServer(const SelectIn& in, SelectOut* out) override;
+    DynPartLoadBalancer* New(const butil::StringPiece&) const override;
+    void Destroy() override;
+    void Describe(std::ostream&, const DescribeOptions& options) override;
 
 private:
     struct Servers {
diff --git a/src/brpc/policy/locality_aware_load_balancer.h 
b/src/brpc/policy/locality_aware_load_balancer.h
index 4129a2d5..82373a36 100644
--- a/src/brpc/policy/locality_aware_load_balancer.h
+++ b/src/brpc/policy/locality_aware_load_balancer.h
@@ -41,16 +41,16 @@ DECLARE_double(punish_inflight_ratio);
 class LocalityAwareLoadBalancer : public LoadBalancer {
 public:
     LocalityAwareLoadBalancer();
-    ~LocalityAwareLoadBalancer();
-    bool AddServer(const ServerId& id);
-    bool RemoveServer(const ServerId& id);
-    size_t AddServersInBatch(const std::vector<ServerId>& servers);
-    size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
-    LocalityAwareLoadBalancer* New(const butil::StringPiece&) const;
-    void Destroy();
-    int SelectServer(const SelectIn& in, SelectOut* out);
-    void Feedback(const CallInfo& info);
-    void Describe(std::ostream& os, const DescribeOptions& options);
+    ~LocalityAwareLoadBalancer() override;
+    bool AddServer(const ServerId& id) override;
+    bool RemoveServer(const ServerId& id) override;
+    size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+    size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+    LocalityAwareLoadBalancer* New(const butil::StringPiece&) const override;
+    void Destroy() override;
+    int SelectServer(const SelectIn& in, SelectOut* out) override;
+    void Feedback(const CallInfo& info) override;
+    void Describe(std::ostream& os, const DescribeOptions& options) override;
 
 private:
     struct TimeInfo {
diff --git a/src/brpc/policy/randomized_load_balancer.cpp 
b/src/brpc/policy/randomized_load_balancer.cpp
index 353074eb..5c4ba447 100644
--- a/src/brpc/policy/randomized_load_balancer.cpp
+++ b/src/brpc/policy/randomized_load_balancer.cpp
@@ -25,14 +25,6 @@
 namespace brpc {
 namespace policy {
 
-const uint32_t prime_offset[] = {
-#include "bthread/offset_inl.list"
-};
-
-inline uint32_t GenRandomStride() {
-    return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
-}
-
 bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
     if (bg.server_list.capacity() < 128) {
         bg.server_list.reserve(128);
diff --git a/src/brpc/policy/randomized_load_balancer.h 
b/src/brpc/policy/randomized_load_balancer.h
index e599648b..3787e45a 100644
--- a/src/brpc/policy/randomized_load_balancer.h
+++ b/src/brpc/policy/randomized_load_balancer.h
@@ -33,14 +33,14 @@ namespace policy {
 // than RoundRobinLoadBalancer.
 class RandomizedLoadBalancer : public LoadBalancer {
 public:
-    bool AddServer(const ServerId& id);
-    bool RemoveServer(const ServerId& id);
-    size_t AddServersInBatch(const std::vector<ServerId>& servers);
-    size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
-    int SelectServer(const SelectIn& in, SelectOut* out);
-    RandomizedLoadBalancer* New(const butil::StringPiece&) const;
-    void Destroy();
-    void Describe(std::ostream& os, const DescribeOptions&);
+    bool AddServer(const ServerId& id) override;
+    bool RemoveServer(const ServerId& id) override;
+    size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+    size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+    int SelectServer(const SelectIn& in, SelectOut* out) override;
+    RandomizedLoadBalancer* New(const butil::StringPiece&) const override;
+    void Destroy() override;
+    void Describe(std::ostream& os, const DescribeOptions&) override;
     
 private:
     struct Servers {
diff --git a/src/brpc/policy/round_robin_load_balancer.cpp 
b/src/brpc/policy/round_robin_load_balancer.cpp
index 0bc2f58b..1d16131a 100644
--- a/src/brpc/policy/round_robin_load_balancer.cpp
+++ b/src/brpc/policy/round_robin_load_balancer.cpp
@@ -25,14 +25,6 @@
 namespace brpc {
 namespace policy {
 
-const uint32_t prime_offset[] = {
-#include "bthread/offset_inl.list"
-};
-
-inline uint32_t GenRandomStride() {
-    return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))];
-}
-
 bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) {
     if (bg.server_list.capacity() < 128) {
         bg.server_list.reserve(128);
diff --git a/src/brpc/policy/round_robin_load_balancer.h 
b/src/brpc/policy/round_robin_load_balancer.h
index c5a34a8c..f087dcdc 100644
--- a/src/brpc/policy/round_robin_load_balancer.h
+++ b/src/brpc/policy/round_robin_load_balancer.h
@@ -32,14 +32,14 @@ namespace policy {
 // at the same time) are very close.
 class RoundRobinLoadBalancer : public LoadBalancer {
 public:
-    bool AddServer(const ServerId& id);
-    bool RemoveServer(const ServerId& id);
-    size_t AddServersInBatch(const std::vector<ServerId>& servers);
-    size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
-    int SelectServer(const SelectIn& in, SelectOut* out);
-    RoundRobinLoadBalancer* New(const butil::StringPiece&) const;
-    void Destroy();
-    void Describe(std::ostream&, const DescribeOptions& options);
+    bool AddServer(const ServerId& id) override;
+    bool RemoveServer(const ServerId& id) override;
+    size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+    size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+    int SelectServer(const SelectIn& in, SelectOut* out) override;
+    RoundRobinLoadBalancer* New(const butil::StringPiece&) const override;
+    void Destroy() override;
+    void Describe(std::ostream&, const DescribeOptions& options) override;
 
 private:
     struct Servers {
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp 
b/src/brpc/policy/weighted_randomized_load_balancer.cpp
index 0e741ef8..28cd7e3f 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.cpp
+++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp
@@ -26,8 +26,9 @@
 namespace brpc {
 namespace policy {
 
-static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, 
const WeightedRandomizedLoadBalancer::Server& rhs) {
-        return (lhs.current_weight_sum < rhs.current_weight_sum);
+static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs,
+                           const WeightedRandomizedLoadBalancer::Server& rhs) {
+        return lhs.current_weight_sum < rhs.current_weight_sum;
 }
 
 bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) {
@@ -38,7 +39,8 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const 
ServerId& id) {
     if (!butil::StringToUint(id.tag, &weight) || weight <= 0) {
         if (FLAGS_default_weight_of_wlb > 0) {
             LOG(WARNING) << "Invalid weight is set: " << id.tag
-                         << ". Now, 'weight' has been set to 
'FLAGS_default_weight_of_wlb' by default.";
+                         << ". Now, 'weight' has been set to "
+                            "FLAGS_default_weight_of_wlb by default.";
             weight = FLAGS_default_weight_of_wlb;
         } else {
             LOG(ERROR) << "Invalid weight is set: " << id.tag;
@@ -46,7 +48,7 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const 
ServerId& id) {
         }
     }
     bool insert_server =
-             bg.server_map.emplace(id.id, bg.server_list.size()).second;
+        bg.server_map.emplace(id.id, bg.server_list.size()).second;
     if (insert_server) {
         uint64_t current_weight_sum = bg.weight_sum + weight;
         bg.server_list.emplace_back(id.id, weight, current_weight_sum);
@@ -114,6 +116,10 @@ size_t 
WeightedRandomizedLoadBalancer::RemoveServersInBatch(
     return _db_servers.Modify(BatchRemove, servers);
 }
 
+bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, 
SocketUniquePtr* out) {
+    return Socket::Address(id, out) == 0 && (*out)->IsAvailable();
+}
+
 int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, 
SelectOut* out) {
     butil::DoublyBufferedData<Servers>::ScopedPtr s;
     if (_db_servers.Read(&s) != 0) {
@@ -123,22 +129,49 @@ int WeightedRandomizedLoadBalancer::SelectServer(const 
SelectIn& in, SelectOut*
     if (n == 0) {
         return ENODATA;
     }
+
+    butil::FlatSet<SocketId> random_traversed;
     uint64_t weight_sum = s->weight_sum;
     for (size_t i = 0; i < n; ++i) {
         uint64_t random_weight = butil::fast_rand_less_than(weight_sum);
         const Server random_server(0, 0, random_weight);
-        const auto& server = std::lower_bound(s->server_list.begin(), 
s->server_list.end(), random_server, server_compare);
+        const auto& server =
+            std::lower_bound(s->server_list.begin(), s->server_list.end(),
+                             random_server, server_compare);
         const SocketId id = server->id;
-        if (((i + 1) == n  // always take last chance
-             || !ExcludedServers::IsExcluded(in.excluded, id))
-            && Socket::Address(id, out->ptr) == 0
-            && (*out->ptr)->IsAvailable()) {
-            // We found an available server
+        if (ExcludedServers::IsExcluded(in.excluded, id)) {
+            continue;
+        }
+        random_traversed.insert(id);
+        if (0 == IsServerAvailable(id, out->ptr)) {
+            // An available server is found.
             return 0;
         }
     }
-    // After we traversed the whole server list, there is still no
-    // available server
+
+    if (random_traversed.size() == n) {
+        // Try to traverse the remaining servers to find an available server.
+        uint32_t offset = butil::fast_rand_less_than(n);
+        uint32_t stride = GenRandomStride();
+        for (size_t i = 0; i < n; ++i) {
+            offset = (offset + stride) % n;
+            SocketId id = s->server_list[offset].id;
+            if (NULL != random_traversed.seek(id)) {
+                continue;
+            }
+            if (IsServerAvailable(id, out->ptr)) {
+                // An available server is found.
+                return 0;
+            }
+        }
+    }
+
+    if (NULL != out->ptr) {
+        // Use the excluded but available server.
+        return 0;
+    }
+
+    // After traversing the whole server list, no available server is found.
     return EHOSTDOWN;
 }
 
diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h 
b/src/brpc/policy/weighted_randomized_load_balancer.h
index 2c8b0fd4..3842affa 100644
--- a/src/brpc/policy/weighted_randomized_load_balancer.h
+++ b/src/brpc/policy/weighted_randomized_load_balancer.h
@@ -31,17 +31,18 @@ namespace policy {
 // Weight is got from tag of ServerId.
 class WeightedRandomizedLoadBalancer : public LoadBalancer {
 public:
-    bool AddServer(const ServerId& id);
-    bool RemoveServer(const ServerId& id);
-    size_t AddServersInBatch(const std::vector<ServerId>& servers);
-    size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
-    int SelectServer(const SelectIn& in, SelectOut* out);
-    LoadBalancer* New(const butil::StringPiece&) const;
-    void Destroy();
-    void Describe(std::ostream& os, const DescribeOptions&);
+    bool AddServer(const ServerId& id) override;
+    bool RemoveServer(const ServerId& id) override;
+    size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+    size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+    int SelectServer(const SelectIn& in, SelectOut* out) override;
+    LoadBalancer* New(const butil::StringPiece&) const override;
+    void Destroy() override;
+    void Describe(std::ostream& os, const DescribeOptions&) override;
 
     struct Server {
-        Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0): 
id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
+        Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0)
+            : id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {}
         SocketId id;
         uint32_t weight;
         uint64_t current_weight_sum;
@@ -60,6 +61,7 @@ private:
     static bool Remove(Servers& bg, const ServerId& id);
     static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers);
     static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& 
servers);
+    static bool IsServerAvailable(SocketId id, SocketUniquePtr* out);
 
     butil::DoublyBufferedData<Servers> _db_servers;
 };
diff --git a/src/brpc/policy/weighted_round_robin_load_balancer.cpp 
b/src/brpc/policy/weighted_round_robin_load_balancer.cpp
index 598d7dc0..44d8a957 100644
--- a/src/brpc/policy/weighted_round_robin_load_balancer.cpp
+++ b/src/brpc/policy/weighted_round_robin_load_balancer.cpp
@@ -58,8 +58,8 @@ uint64_t GetStride(const uint64_t weight_sum, const size_t 
num) {
       return 1;
     }
     uint32_t average_weight = weight_sum / num;
-    auto iter = std::lower_bound(prime_stride.begin(), prime_stride.end(),
-                                 average_weight);
+    auto iter = std::lower_bound(
+        prime_stride.begin(), prime_stride.end(), average_weight);
     while (iter != prime_stride.end()
            && !IsCoprime(weight_sum, *iter)) {
         ++iter;
@@ -197,7 +197,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const 
SelectIn& in, SelectOut*
             }
             filter.emplace(server_id);
             remain_weight -= 
(s->server_list[s->server_map.at(server_id)]).weight;
-            // Select from begining status.
+            // Select from beginning status.
             tls_temp.stride = GetStride(remain_weight, remain_servers);
             tls_temp.position = tls.position;
             tls_temp.remain_server = tls.remain_server;
diff --git a/src/brpc/policy/weighted_round_robin_load_balancer.h 
b/src/brpc/policy/weighted_round_robin_load_balancer.h
index fc3df2da..828de659 100644
--- a/src/brpc/policy/weighted_round_robin_load_balancer.h
+++ b/src/brpc/policy/weighted_round_robin_load_balancer.h
@@ -32,14 +32,14 @@ namespace policy {
 // Weight is got from tag of ServerId.
 class WeightedRoundRobinLoadBalancer : public LoadBalancer {
 public:
-    bool AddServer(const ServerId& id);
-    bool RemoveServer(const ServerId& id);
-    size_t AddServersInBatch(const std::vector<ServerId>& servers);
-    size_t RemoveServersInBatch(const std::vector<ServerId>& servers);
-    int SelectServer(const SelectIn& in, SelectOut* out);
-    LoadBalancer* New(const butil::StringPiece&) const;
-    void Destroy();
-    void Describe(std::ostream&, const DescribeOptions& options);
+    bool AddServer(const ServerId& id) override;
+    bool RemoveServer(const ServerId& id) override;
+    size_t AddServersInBatch(const std::vector<ServerId>& servers) override;
+    size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override;
+    int SelectServer(const SelectIn& in, SelectOut* out) override;
+    LoadBalancer* New(const butil::StringPiece&) const override;
+    void Destroy() override;
+    void Describe(std::ostream&, const DescribeOptions& options) override;
 
 private:
     struct Server {
diff --git a/test/bthread_countdown_event_unittest.cpp 
b/test/bthread_countdown_event_unittest.cpp
index bb018eee..bed8f17e 100644
--- a/test/bthread_countdown_event_unittest.cpp
+++ b/test/bthread_countdown_event_unittest.cpp
@@ -36,6 +36,7 @@ void *signaler(void *arg) {
 }
 
 TEST(CountdonwEventTest, sanity) {
+    std::vector<bthread_t> tids;
     for (int n = 1; n < 10; ++n) {
         Arg a;
         a.num_sig = n;
@@ -43,10 +44,14 @@ TEST(CountdonwEventTest, sanity) {
         for (int i = 0; i < n; ++i) {
             bthread_t tid;
             ASSERT_EQ(0, bthread_start_urgent(&tid, NULL, signaler, &a));
+            tids.push_back(tid);
         }
         a.event.wait();
         ASSERT_EQ(0, a.num_sig.load(butil::memory_order_relaxed));
     }
+    for (size_t i = 0; i < tids.size(); ++i) {
+        bthread_join(tids[i], NULL);
+    }
 }
 
 TEST(CountdonwEventTest, timed_wait) {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to