This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 011dabe9 Make sure WeightedRandomizedLoadBalancer can traverse the whole server list (#2953) 011dabe9 is described below commit 011dabe9c896705fd539df6467ebab1c3f02dcf1 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Thu May 8 17:56:00 2025 +0800 Make sure WeightedRandomizedLoadBalancer can traverse the whole server list (#2953) * Make sure WeightedRandomizedLoadBalancer can traverse the whole server list * Only mark unavailable server as traversed --- src/brpc/load_balancer.h | 7 +++ src/brpc/policy/dynpart_load_balancer.h | 16 +++--- src/brpc/policy/locality_aware_load_balancer.h | 20 ++++---- src/brpc/policy/randomized_load_balancer.cpp | 8 --- src/brpc/policy/randomized_load_balancer.h | 16 +++--- src/brpc/policy/round_robin_load_balancer.cpp | 8 --- src/brpc/policy/round_robin_load_balancer.h | 16 +++--- .../policy/weighted_randomized_load_balancer.cpp | 57 +++++++++++++++++----- .../policy/weighted_randomized_load_balancer.h | 20 ++++---- .../policy/weighted_round_robin_load_balancer.cpp | 6 +-- .../policy/weighted_round_robin_load_balancer.h | 16 +++--- test/bthread_countdown_event_unittest.cpp | 5 ++ 12 files changed, 113 insertions(+), 82 deletions(-) diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h index cda0517e..a32b298d 100644 --- a/src/brpc/load_balancer.h +++ b/src/brpc/load_balancer.h @@ -184,6 +184,13 @@ inline Extension<const LoadBalancer>* LoadBalancerExtension() { return Extension<const LoadBalancer>::instance(); } +inline uint32_t GenRandomStride() { + uint32_t prime_offset[] = { + #include "bthread/offset_inl.list" + }; + return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))]; +} + } // namespace brpc diff --git a/src/brpc/policy/dynpart_load_balancer.h b/src/brpc/policy/dynpart_load_balancer.h index d3fd9d68..4e8833bf 100644 --- a/src/brpc/policy/dynpart_load_balancer.h +++ b/src/brpc/policy/dynpart_load_balancer.h @@ -33,14 +33,14 @@ namespace policy { class DynPartLoadBalancer : public LoadBalancer { public: - bool AddServer(const ServerId& id); - bool RemoveServer(const ServerId& id); - size_t AddServersInBatch(const std::vector<ServerId>& servers); - size_t RemoveServersInBatch(const std::vector<ServerId>& servers); - int SelectServer(const SelectIn& in, SelectOut* out); - DynPartLoadBalancer* New(const butil::StringPiece&) const; - void Destroy(); - void Describe(std::ostream&, const DescribeOptions& options); + bool AddServer(const ServerId& id) override; + bool RemoveServer(const ServerId& id) override; + size_t AddServersInBatch(const std::vector<ServerId>& servers) override; + size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override; + int SelectServer(const SelectIn& in, SelectOut* out) override; + DynPartLoadBalancer* New(const butil::StringPiece&) const override; + void Destroy() override; + void Describe(std::ostream&, const DescribeOptions& options) override; private: struct Servers { diff --git a/src/brpc/policy/locality_aware_load_balancer.h b/src/brpc/policy/locality_aware_load_balancer.h index 4129a2d5..82373a36 100644 --- a/src/brpc/policy/locality_aware_load_balancer.h +++ b/src/brpc/policy/locality_aware_load_balancer.h @@ -41,16 +41,16 @@ DECLARE_double(punish_inflight_ratio); class LocalityAwareLoadBalancer : public LoadBalancer { public: LocalityAwareLoadBalancer(); - ~LocalityAwareLoadBalancer(); - bool AddServer(const ServerId& id); - bool RemoveServer(const ServerId& id); - size_t AddServersInBatch(const std::vector<ServerId>& servers); - size_t RemoveServersInBatch(const std::vector<ServerId>& servers); - LocalityAwareLoadBalancer* New(const butil::StringPiece&) const; - void Destroy(); - int SelectServer(const SelectIn& in, SelectOut* out); - void Feedback(const CallInfo& info); - void Describe(std::ostream& os, const DescribeOptions& options); + ~LocalityAwareLoadBalancer() override; + bool AddServer(const ServerId& id) override; + bool RemoveServer(const ServerId& id) override; + size_t AddServersInBatch(const std::vector<ServerId>& servers) override; + size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override; + LocalityAwareLoadBalancer* New(const butil::StringPiece&) const override; + void Destroy() override; + int SelectServer(const SelectIn& in, SelectOut* out) override; + void Feedback(const CallInfo& info) override; + void Describe(std::ostream& os, const DescribeOptions& options) override; private: struct TimeInfo { diff --git a/src/brpc/policy/randomized_load_balancer.cpp b/src/brpc/policy/randomized_load_balancer.cpp index 353074eb..5c4ba447 100644 --- a/src/brpc/policy/randomized_load_balancer.cpp +++ b/src/brpc/policy/randomized_load_balancer.cpp @@ -25,14 +25,6 @@ namespace brpc { namespace policy { -const uint32_t prime_offset[] = { -#include "bthread/offset_inl.list" -}; - -inline uint32_t GenRandomStride() { - return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))]; -} - bool RandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) { if (bg.server_list.capacity() < 128) { bg.server_list.reserve(128); diff --git a/src/brpc/policy/randomized_load_balancer.h b/src/brpc/policy/randomized_load_balancer.h index e599648b..3787e45a 100644 --- a/src/brpc/policy/randomized_load_balancer.h +++ b/src/brpc/policy/randomized_load_balancer.h @@ -33,14 +33,14 @@ namespace policy { // than RoundRobinLoadBalancer. class RandomizedLoadBalancer : public LoadBalancer { public: - bool AddServer(const ServerId& id); - bool RemoveServer(const ServerId& id); - size_t AddServersInBatch(const std::vector<ServerId>& servers); - size_t RemoveServersInBatch(const std::vector<ServerId>& servers); - int SelectServer(const SelectIn& in, SelectOut* out); - RandomizedLoadBalancer* New(const butil::StringPiece&) const; - void Destroy(); - void Describe(std::ostream& os, const DescribeOptions&); + bool AddServer(const ServerId& id) override; + bool RemoveServer(const ServerId& id) override; + size_t AddServersInBatch(const std::vector<ServerId>& servers) override; + size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override; + int SelectServer(const SelectIn& in, SelectOut* out) override; + RandomizedLoadBalancer* New(const butil::StringPiece&) const override; + void Destroy() override; + void Describe(std::ostream& os, const DescribeOptions&) override; private: struct Servers { diff --git a/src/brpc/policy/round_robin_load_balancer.cpp b/src/brpc/policy/round_robin_load_balancer.cpp index 0bc2f58b..1d16131a 100644 --- a/src/brpc/policy/round_robin_load_balancer.cpp +++ b/src/brpc/policy/round_robin_load_balancer.cpp @@ -25,14 +25,6 @@ namespace brpc { namespace policy { -const uint32_t prime_offset[] = { -#include "bthread/offset_inl.list" -}; - -inline uint32_t GenRandomStride() { - return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))]; -} - bool RoundRobinLoadBalancer::Add(Servers& bg, const ServerId& id) { if (bg.server_list.capacity() < 128) { bg.server_list.reserve(128); diff --git a/src/brpc/policy/round_robin_load_balancer.h b/src/brpc/policy/round_robin_load_balancer.h index c5a34a8c..f087dcdc 100644 --- a/src/brpc/policy/round_robin_load_balancer.h +++ b/src/brpc/policy/round_robin_load_balancer.h @@ -32,14 +32,14 @@ namespace policy { // at the same time) are very close. class RoundRobinLoadBalancer : public LoadBalancer { public: - bool AddServer(const ServerId& id); - bool RemoveServer(const ServerId& id); - size_t AddServersInBatch(const std::vector<ServerId>& servers); - size_t RemoveServersInBatch(const std::vector<ServerId>& servers); - int SelectServer(const SelectIn& in, SelectOut* out); - RoundRobinLoadBalancer* New(const butil::StringPiece&) const; - void Destroy(); - void Describe(std::ostream&, const DescribeOptions& options); + bool AddServer(const ServerId& id) override; + bool RemoveServer(const ServerId& id) override; + size_t AddServersInBatch(const std::vector<ServerId>& servers) override; + size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override; + int SelectServer(const SelectIn& in, SelectOut* out) override; + RoundRobinLoadBalancer* New(const butil::StringPiece&) const override; + void Destroy() override; + void Describe(std::ostream&, const DescribeOptions& options) override; private: struct Servers { diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp b/src/brpc/policy/weighted_randomized_load_balancer.cpp index 0e741ef8..28cd7e3f 100644 --- a/src/brpc/policy/weighted_randomized_load_balancer.cpp +++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp @@ -26,8 +26,9 @@ namespace brpc { namespace policy { -static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, const WeightedRandomizedLoadBalancer::Server& rhs) { - return (lhs.current_weight_sum < rhs.current_weight_sum); +static bool server_compare(const WeightedRandomizedLoadBalancer::Server& lhs, + const WeightedRandomizedLoadBalancer::Server& rhs) { + return lhs.current_weight_sum < rhs.current_weight_sum; } bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) { @@ -38,7 +39,8 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) { if (!butil::StringToUint(id.tag, &weight) || weight <= 0) { if (FLAGS_default_weight_of_wlb > 0) { LOG(WARNING) << "Invalid weight is set: " << id.tag - << ". Now, 'weight' has been set to 'FLAGS_default_weight_of_wlb' by default."; + << ". Now, 'weight' has been set to " + "FLAGS_default_weight_of_wlb by default."; weight = FLAGS_default_weight_of_wlb; } else { LOG(ERROR) << "Invalid weight is set: " << id.tag; @@ -46,7 +48,7 @@ bool WeightedRandomizedLoadBalancer::Add(Servers& bg, const ServerId& id) { } } bool insert_server = - bg.server_map.emplace(id.id, bg.server_list.size()).second; + bg.server_map.emplace(id.id, bg.server_list.size()).second; if (insert_server) { uint64_t current_weight_sum = bg.weight_sum + weight; bg.server_list.emplace_back(id.id, weight, current_weight_sum); @@ -114,6 +116,10 @@ size_t WeightedRandomizedLoadBalancer::RemoveServersInBatch( return _db_servers.Modify(BatchRemove, servers); } +bool WeightedRandomizedLoadBalancer::IsServerAvailable(SocketId id, SocketUniquePtr* out) { + return Socket::Address(id, out) == 0 && (*out)->IsAvailable(); +} + int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { butil::DoublyBufferedData<Servers>::ScopedPtr s; if (_db_servers.Read(&s) != 0) { @@ -123,22 +129,49 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* if (n == 0) { return ENODATA; } + + butil::FlatSet<SocketId> random_traversed; uint64_t weight_sum = s->weight_sum; for (size_t i = 0; i < n; ++i) { uint64_t random_weight = butil::fast_rand_less_than(weight_sum); const Server random_server(0, 0, random_weight); - const auto& server = std::lower_bound(s->server_list.begin(), s->server_list.end(), random_server, server_compare); + const auto& server = + std::lower_bound(s->server_list.begin(), s->server_list.end(), + random_server, server_compare); const SocketId id = server->id; - if (((i + 1) == n // always take last chance - || !ExcludedServers::IsExcluded(in.excluded, id)) - && Socket::Address(id, out->ptr) == 0 - && (*out->ptr)->IsAvailable()) { - // We found an available server + if (ExcludedServers::IsExcluded(in.excluded, id)) { + continue; + } + random_traversed.insert(id); + if (0 == IsServerAvailable(id, out->ptr)) { + // An available server is found. return 0; } } - // After we traversed the whole server list, there is still no - // available server + + if (random_traversed.size() == n) { + // Try to traverse the remaining servers to find an available server. + uint32_t offset = butil::fast_rand_less_than(n); + uint32_t stride = GenRandomStride(); + for (size_t i = 0; i < n; ++i) { + offset = (offset + stride) % n; + SocketId id = s->server_list[offset].id; + if (NULL != random_traversed.seek(id)) { + continue; + } + if (IsServerAvailable(id, out->ptr)) { + // An available server is found. + return 0; + } + } + } + + if (NULL != out->ptr) { + // Use the excluded but available server. + return 0; + } + + // After traversing the whole server list, no available server is found. return EHOSTDOWN; } diff --git a/src/brpc/policy/weighted_randomized_load_balancer.h b/src/brpc/policy/weighted_randomized_load_balancer.h index 2c8b0fd4..3842affa 100644 --- a/src/brpc/policy/weighted_randomized_load_balancer.h +++ b/src/brpc/policy/weighted_randomized_load_balancer.h @@ -31,17 +31,18 @@ namespace policy { // Weight is got from tag of ServerId. class WeightedRandomizedLoadBalancer : public LoadBalancer { public: - bool AddServer(const ServerId& id); - bool RemoveServer(const ServerId& id); - size_t AddServersInBatch(const std::vector<ServerId>& servers); - size_t RemoveServersInBatch(const std::vector<ServerId>& servers); - int SelectServer(const SelectIn& in, SelectOut* out); - LoadBalancer* New(const butil::StringPiece&) const; - void Destroy(); - void Describe(std::ostream& os, const DescribeOptions&); + bool AddServer(const ServerId& id) override; + bool RemoveServer(const ServerId& id) override; + size_t AddServersInBatch(const std::vector<ServerId>& servers) override; + size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override; + int SelectServer(const SelectIn& in, SelectOut* out) override; + LoadBalancer* New(const butil::StringPiece&) const override; + void Destroy() override; + void Describe(std::ostream& os, const DescribeOptions&) override; struct Server { - Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0): id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {} + Server(SocketId s_id = 0, uint32_t s_w = 0, uint64_t s_c_w_s = 0) + : id(s_id), weight(s_w), current_weight_sum(s_c_w_s) {} SocketId id; uint32_t weight; uint64_t current_weight_sum; @@ -60,6 +61,7 @@ private: static bool Remove(Servers& bg, const ServerId& id); static size_t BatchAdd(Servers& bg, const std::vector<ServerId>& servers); static size_t BatchRemove(Servers& bg, const std::vector<ServerId>& servers); + static bool IsServerAvailable(SocketId id, SocketUniquePtr* out); butil::DoublyBufferedData<Servers> _db_servers; }; diff --git a/src/brpc/policy/weighted_round_robin_load_balancer.cpp b/src/brpc/policy/weighted_round_robin_load_balancer.cpp index 598d7dc0..44d8a957 100644 --- a/src/brpc/policy/weighted_round_robin_load_balancer.cpp +++ b/src/brpc/policy/weighted_round_robin_load_balancer.cpp @@ -58,8 +58,8 @@ uint64_t GetStride(const uint64_t weight_sum, const size_t num) { return 1; } uint32_t average_weight = weight_sum / num; - auto iter = std::lower_bound(prime_stride.begin(), prime_stride.end(), - average_weight); + auto iter = std::lower_bound( + prime_stride.begin(), prime_stride.end(), average_weight); while (iter != prime_stride.end() && !IsCoprime(weight_sum, *iter)) { ++iter; @@ -197,7 +197,7 @@ int WeightedRoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* } filter.emplace(server_id); remain_weight -= (s->server_list[s->server_map.at(server_id)]).weight; - // Select from begining status. + // Select from beginning status. tls_temp.stride = GetStride(remain_weight, remain_servers); tls_temp.position = tls.position; tls_temp.remain_server = tls.remain_server; diff --git a/src/brpc/policy/weighted_round_robin_load_balancer.h b/src/brpc/policy/weighted_round_robin_load_balancer.h index fc3df2da..828de659 100644 --- a/src/brpc/policy/weighted_round_robin_load_balancer.h +++ b/src/brpc/policy/weighted_round_robin_load_balancer.h @@ -32,14 +32,14 @@ namespace policy { // Weight is got from tag of ServerId. class WeightedRoundRobinLoadBalancer : public LoadBalancer { public: - bool AddServer(const ServerId& id); - bool RemoveServer(const ServerId& id); - size_t AddServersInBatch(const std::vector<ServerId>& servers); - size_t RemoveServersInBatch(const std::vector<ServerId>& servers); - int SelectServer(const SelectIn& in, SelectOut* out); - LoadBalancer* New(const butil::StringPiece&) const; - void Destroy(); - void Describe(std::ostream&, const DescribeOptions& options); + bool AddServer(const ServerId& id) override; + bool RemoveServer(const ServerId& id) override; + size_t AddServersInBatch(const std::vector<ServerId>& servers) override; + size_t RemoveServersInBatch(const std::vector<ServerId>& servers) override; + int SelectServer(const SelectIn& in, SelectOut* out) override; + LoadBalancer* New(const butil::StringPiece&) const override; + void Destroy() override; + void Describe(std::ostream&, const DescribeOptions& options) override; private: struct Server { diff --git a/test/bthread_countdown_event_unittest.cpp b/test/bthread_countdown_event_unittest.cpp index bb018eee..bed8f17e 100644 --- a/test/bthread_countdown_event_unittest.cpp +++ b/test/bthread_countdown_event_unittest.cpp @@ -36,6 +36,7 @@ void *signaler(void *arg) { } TEST(CountdonwEventTest, sanity) { + std::vector<bthread_t> tids; for (int n = 1; n < 10; ++n) { Arg a; a.num_sig = n; @@ -43,10 +44,14 @@ TEST(CountdonwEventTest, sanity) { for (int i = 0; i < n; ++i) { bthread_t tid; ASSERT_EQ(0, bthread_start_urgent(&tid, NULL, signaler, &a)); + tids.push_back(tid); } a.event.wait(); ASSERT_EQ(0, a.num_sig.load(butil::memory_order_relaxed)); } + for (size_t i = 0; i < tids.size(); ++i) { + bthread_join(tids[i], NULL); + } } TEST(CountdonwEventTest, timed_wait) { --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org