This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 05ec537a Bugfix: bthread_worker_usage could exceed bthread_worker_count (#3009) 05ec537a is described below commit 05ec537ae67f9a1b2ea28f14e33d18d7155ce152 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Wed Jul 23 10:15:36 2025 +0800 Bugfix: bthread_worker_usage could exceed bthread_worker_count (#3009) * Bugfix: bthread_worker_usage would be greater than bthread_worker_count * Use CPU atomic 128-bit aligned loads and stores * Encapsulating AtomicInteger128 class * Remove unused header files --- src/brpc/load_balancer.h | 7 -- src/brpc/policy/randomized_load_balancer.cpp | 3 +- src/brpc/policy/round_robin_load_balancer.cpp | 3 +- .../policy/weighted_randomized_load_balancer.cpp | 3 +- src/bthread/prime_offset.h | 39 ++++++ src/bthread/task_control.cpp | 12 +- src/bthread/task_control.h | 2 +- src/bthread/task_group.cpp | 113 +++++++++++------ src/bthread/task_group.h | 140 +++++++++++++++++---- 9 files changed, 240 insertions(+), 82 deletions(-) diff --git a/src/brpc/load_balancer.h b/src/brpc/load_balancer.h index a32b298d..cda0517e 100644 --- a/src/brpc/load_balancer.h +++ b/src/brpc/load_balancer.h @@ -184,13 +184,6 @@ inline Extension<const LoadBalancer>* LoadBalancerExtension() { return Extension<const LoadBalancer>::instance(); } -inline uint32_t GenRandomStride() { - uint32_t prime_offset[] = { - #include "bthread/offset_inl.list" - }; - return prime_offset[butil::fast_rand_less_than(ARRAY_SIZE(prime_offset))]; -} - } // namespace brpc diff --git a/src/brpc/policy/randomized_load_balancer.cpp b/src/brpc/policy/randomized_load_balancer.cpp index 5c4ba447..65cfdee9 100644 --- a/src/brpc/policy/randomized_load_balancer.cpp +++ b/src/brpc/policy/randomized_load_balancer.cpp @@ -18,6 +18,7 @@ #include "butil/macros.h" #include "butil/fast_rand.h" +#include "bthread/prime_offset.h" #include "brpc/socket.h" #include "brpc/policy/randomized_load_balancer.h" #include "butil/strings/string_number_conversions.h" @@ -118,7 +119,7 @@ int RandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { return 0; } if (stride == 0) { - stride = GenRandomStride(); + stride = bthread::prime_offset(); } // If `Address' failed, use `offset+stride' to retry so that // this failed server won't be visited again inside for diff --git a/src/brpc/policy/round_robin_load_balancer.cpp b/src/brpc/policy/round_robin_load_balancer.cpp index 1d16131a..fa69aa86 100644 --- a/src/brpc/policy/round_robin_load_balancer.cpp +++ b/src/brpc/policy/round_robin_load_balancer.cpp @@ -18,6 +18,7 @@ #include "butil/macros.h" #include "butil/fast_rand.h" +#include "bthread/prime_offset.h" #include "brpc/socket.h" #include "brpc/policy/round_robin_load_balancer.h" @@ -108,7 +109,7 @@ int RoundRobinLoadBalancer::SelectServer(const SelectIn& in, SelectOut* out) { } TLS tls = s.tls(); if (tls.stride == 0) { - tls.stride = GenRandomStride(); + tls.stride = bthread::prime_offset(); // use random at first time, for the case of // use rr lb every time in new thread tls.offset = butil::fast_rand_less_than(n); diff --git a/src/brpc/policy/weighted_randomized_load_balancer.cpp b/src/brpc/policy/weighted_randomized_load_balancer.cpp index 28cd7e3f..819c550c 100644 --- a/src/brpc/policy/weighted_randomized_load_balancer.cpp +++ b/src/brpc/policy/weighted_randomized_load_balancer.cpp @@ -19,6 +19,7 @@ #include <algorithm> #include "butil/fast_rand.h" +#include "bthread/prime_offset.h" #include "brpc/socket.h" #include "brpc/policy/weighted_randomized_load_balancer.h" #include "butil/strings/string_number_conversions.h" @@ -152,7 +153,7 @@ int WeightedRandomizedLoadBalancer::SelectServer(const SelectIn& in, SelectOut* if (random_traversed.size() == n) { // Try to traverse the remaining servers to find an available server. uint32_t offset = butil::fast_rand_less_than(n); - uint32_t stride = GenRandomStride(); + uint32_t stride = bthread::prime_offset(); for (size_t i = 0; i < n; ++i) { offset = (offset + stride) % n; SocketId id = s->server_list[offset].id; diff --git a/src/bthread/prime_offset.h b/src/bthread/prime_offset.h new file mode 100644 index 00000000..7137a68c --- /dev/null +++ b/src/bthread/prime_offset.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BTHREAD_PRIME_OFFSET_H +#define BTHREAD_PRIME_OFFSET_H + +#include "butil/fast_rand.h" +#include "butil/macros.h" + +namespace bthread { +// Prime number offset for hash function. +inline size_t prime_offset(size_t seed) { + uint32_t offsets[] = { + #include "bthread/offset_inl.list" + }; + return offsets[seed % ARRAY_SIZE(offsets)]; +} + +inline size_t prime_offset() { + return prime_offset(butil::fast_rand()); +} +} + + +#endif // BTHREAD_PRIME_OFFSET_H \ No newline at end of file diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 66307d32..94be504f 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -152,7 +152,7 @@ static double get_cumulated_worker_time_from_this_with_tag(void* arg) { auto a = static_cast<CumulatedWithTagArgs*>(arg); auto c = a->c; auto t = a->t; - return c->get_cumulated_worker_time_with_tag(t); + return c->get_cumulated_worker_time(t); } static int64_t get_cumulated_switch_count_from_this(void *arg) { @@ -526,22 +526,18 @@ double TaskControl::get_cumulated_worker_time() { int64_t cputime_ns = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); for_each_task_group([&](TaskGroup* g) { - if (g) { - cputime_ns += g->_cumulated_cputime_ns; - } + cputime_ns += g->cumulated_cputime_ns(); }); return cputime_ns / 1000000000.0; } -double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) { +double TaskControl::get_cumulated_worker_time(bthread_tag_t tag) { int64_t cputime_ns = 0; BAIDU_SCOPED_LOCK(_modify_group_mutex); const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed); auto& groups = tag_group(tag); for (size_t i = 0; i < ngroup; ++i) { - if (groups[i]) { - cputime_ns += groups[i]->_cumulated_cputime_ns; - } + cputime_ns += groups[i]->cumulated_cputime_ns(); } return cputime_ns / 1000000000.0; } diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 2a2b76d6..11587b29 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -79,7 +79,7 @@ public: void print_rq_sizes(std::ostream& os); double get_cumulated_worker_time(); - double get_cumulated_worker_time_with_tag(bthread_tag_t tag); + double get_cumulated_worker_time(bthread_tag_t tag); int64_t get_cumulated_switch_count(); int64_t get_cumulated_signal_count(); diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 0d3e473e..d4cb81f6 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -37,6 +37,14 @@ #include "bthread/task_group.h" #include "bthread/timer_thread.h" +#ifdef __x86_64__ +#include <x86intrin.h> +#endif // __x86_64__ + +#ifdef __ARM_NEON +#include <arm_neon.h> +#endif // __ARM_NEON + namespace bthread { static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = { @@ -69,10 +77,6 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, NULL); const TaskStatistics EMPTY_STAT = { 0, 0, 0 }; -const size_t OFFSET_TABLE[] = { -#include "bthread/offset_inl.list" -}; - void* (*g_create_span_func)() = NULL; void* run_create_span_func() { @@ -82,6 +86,39 @@ void* run_create_span_func() { return BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_bls).rpcz_parent_span; } +AtomicInteger128::Value AtomicInteger128::load() const { +#if __x86_64__ || __ARM_NEON + // Supress compiler warning. + (void)_mutex; +#endif // __x86_64__ || __ARM_NEON + +#if __x86_64__ || __ARM_NEON +#ifdef __x86_64__ + __m128i value = _mm_load_si128(reinterpret_cast<const __m128i*>(&_value)); +#else // __ARM_NEON + int64x2_t value = vld1q_s64(reinterpret_cast<const int64_t*>(&_value)); +#endif // __x86_64__ + return {value[0], value[1]}; +#else // __x86_64__ || __ARM_NEON + BAIDU_SCOPED_LOCK(_mutex); + return _value; +#endif // __x86_64__ || __ARM_NEON +} + +void AtomicInteger128::store(Value value) { +#if __x86_64__ + __m128i v = _mm_load_si128(reinterpret_cast<__m128i*>(&value)); + _mm_store_si128(reinterpret_cast<__m128i*>(&_value), v); +#elif __ARM_NEON + int64x2_t v = vld1q_s64(reinterpret_cast<int64_t*>(&value)); + vst1q_s64(reinterpret_cast<int64_t*>(&_value), v); +#else + BAIDU_SCOPED_LOCK(_mutex); + _value = value; +#endif // __x86_64__ || __ARM_NEON +} + + int TaskGroup::get_attr(bthread_t tid, bthread_attr_t* out) { TaskMeta* const m = address_meta(tid); if (m != NULL) { @@ -148,6 +185,16 @@ static double get_cumulated_cputime_from_this(void* arg) { return static_cast<TaskGroup*>(arg)->cumulated_cputime_ns() / 1000000000.0; } +int64_t TaskGroup::cumulated_cputime_ns() const { + CPUTimeStat cpu_time_stat = _cpu_time_stat.load(); + // Add the elapsed time of running bthread. + int64_t cumulated_cputime_ns = cpu_time_stat.cumulated_cputime_ns(); + if (!cpu_time_stat.is_main_task()) { + cumulated_cputime_ns += butil::cpuwide_time_ns() - cpu_time_stat.last_run_ns(); + } + return cumulated_cputime_ns; +} + void TaskGroup::run_main_task() { bvar::PassiveStatus<double> cumulated_cputime( get_cumulated_cputime_from_this, this); @@ -156,11 +203,11 @@ void TaskGroup::run_main_task() { TaskGroup* dummy = this; bthread_t tid; while (wait_task(&tid)) { - TaskGroup::sched_to(&dummy, tid); + sched_to(&dummy, tid); DCHECK_EQ(this, dummy); DCHECK_EQ(_cur_meta->stack, _main_stack); if (_cur_meta->tid != _main_tid) { - TaskGroup::task_runner(1/*skip remained*/); + task_runner(1/*skip remained*/); } if (FLAGS_show_per_worker_usage_in_vars && !usage_bvar) { char name[32]; @@ -176,31 +223,12 @@ void TaskGroup::run_main_task() { } } // Don't forget to add elapse of last wait_task. - current_task()->stat.cputime_ns += butil::cpuwide_time_ns() - _last_run_ns; + current_task()->stat.cputime_ns += + butil::cpuwide_time_ns() - _cpu_time_stat.load_unsafe().last_run_ns(); } TaskGroup::TaskGroup(TaskControl* c) - : _cur_meta(NULL) - , _control(c) - , _num_nosignal(0) - , _nsignaled(0) - , _last_run_ns(butil::cpuwide_time_ns()) - , _cumulated_cputime_ns(0) - , _nswitch(0) - , _last_context_remained(NULL) - , _last_context_remained_arg(NULL) - , _pl(NULL) - , _main_stack(NULL) - , _main_tid(0) - , _remote_num_nosignal(0) - , _remote_nsignaled(0) -#ifndef NDEBUG - , _sched_recursive_guard(0) -#endif - , _tag(BTHREAD_TAG_DEFAULT) - , _tid(-1) { - _steal_seed = butil::fast_rand(); - _steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)]; + : _control(c) { CHECK(c); } @@ -292,8 +320,12 @@ int TaskGroup::init(size_t runqueue_capacity) { _cur_meta = m; _main_tid = m->tid; _main_stack = stk; - _last_run_ns = butil::cpuwide_time_ns(); + + CPUTimeStat cpu_time_stat; + cpu_time_stat.set_last_run_ns(m->cpuwide_start_ns, true); + _cpu_time_stat.store(cpu_time_stat); _last_cpu_clock_ns = 0; + return 0; } @@ -414,7 +446,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) { g->_control->_nbthreads << -1; g->_control->tag_nbthreads(g->tag()) << -1; - g->set_remained(TaskGroup::_release_last_context, m); + g->set_remained(_release_last_context, m); ending_sched(&g); } while (g->_cur_meta->tid != g->_main_tid); @@ -491,9 +523,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, fn = ready_to_run_in_worker; } ReadyToRunArgs args = { - g->tag(), - g->_cur_meta, - (bool)(using_attr.flags & BTHREAD_NOSIGNAL) + g->tag(), g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) }; g->set_remained(fn, &args); sched_to(pg, m->tid); @@ -678,14 +708,18 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { } #endif // Save errno so that errno is bthread-specific. - const int saved_errno = errno; + int saved_errno = errno; void* saved_unique_user_ptr = tls_unique_user_ptr; TaskMeta* const cur_meta = g->_cur_meta; - const int64_t now = butil::cpuwide_time_ns(); - const int64_t elp_ns = now - g->_last_run_ns; - g->_last_run_ns = now; + int64_t now = butil::cpuwide_time_ns(); + CPUTimeStat cpu_time_stat = g->_cpu_time_stat.load_unsafe(); + int64_t elp_ns = now - cpu_time_stat.last_run_ns(); cur_meta->stat.cputime_ns += elp_ns; + // Update cpu_time_stat. + cpu_time_stat.set_last_run_ns(now, is_main_task(g, next_meta->tid)); + cpu_time_stat.add_cumulated_cputime_ns(elp_ns, is_main_task(g, cur_meta->tid)); + g->_cpu_time_stat.store(cpu_time_stat); if (FLAGS_bthread_enable_cpu_clock_stat) { const int64_t cpu_thread_time = butil::cputhread_time_ns(); @@ -696,10 +730,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) { } else { g->_last_cpu_clock_ns = 0; } - - if (cur_meta->tid != g->main_tid()) { - g->_cumulated_cputime_ns += elp_ns; - } + ++cur_meta->stat.nswitch; ++ g->_nswitch; // Switch to the task diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index ccba9ba3..b79e69d8 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -29,6 +29,7 @@ #include "bthread/remote_task_queue.h" // RemoteTaskQueue #include "butil/resource_pool.h" // ResourceId #include "bthread/parking_lot.h" +#include "bthread/prime_offset.h" namespace bthread { @@ -47,6 +48,35 @@ private: void* _value; }; +// Refer to https://rigtorp.se/isatomic/, On the modern CPU microarchitectures +// (Skylake and Zen 2) AVX/AVX2 128b/256b aligned loads and stores are atomic +// even though Intel and AMD officially doesn’t guarantee this. +// On X86, SSE instructions can ensure atomic loads and stores. +// Starting from Armv8.4-A, neon can ensure atomic loads and stores. +// Otherwise, use mutex to guarantee atomicity. +class AtomicInteger128 { +public: + struct Value { + int64_t v1; + int64_t v2; + }; + + AtomicInteger128() = default; + explicit AtomicInteger128(Value value) : _value(value) {} + + Value load() const; + Value load_unsafe() const { + return _value; + } + + void store(Value value); + +private: + Value BAIDU_CACHELINE_ALIGNMENT _value{}; + // Used to protect `_cpu_time_stat' when __x86_64__ and __ARM_NEON is not defined. + FastPthreadMutex _mutex; +}; + // Thread-local group of tasks. // Notice that most methods involving context switching are static otherwise // pointer `this' may change after wakeup. The **pg parameters in following @@ -148,7 +178,7 @@ public: { return _cur_meta->stack == _main_stack; } // Active time in nanoseconds spent by this TaskGroup. - int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; } + int64_t cumulated_cputime_ns() const; // Push a bthread into the runqueue void ready_to_run(TaskMeta* meta, bool nosignal = false); @@ -203,6 +233,70 @@ public: private: friend class TaskControl; + // Last scheduling time, task type and cumulated CPU time. + class CPUTimeStat { + static constexpr int64_t LAST_SCHEDULING_TIME_MASK = 0x7FFFFFFFFFFFFFFFLL; + static constexpr int64_t TASK_TYPE_MASK = 0x8000000000000000LL; + public: + CPUTimeStat() : _last_run_ns_and_type(0), _cumulated_cputime_ns(0) {} + CPUTimeStat(AtomicInteger128::Value value) + : _last_run_ns_and_type(value.v1), _cumulated_cputime_ns(value.v2) {} + + // Convert to AtomicInteger128::Value for atomic operations. + explicit operator AtomicInteger128::Value() const { + return {_last_run_ns_and_type, _cumulated_cputime_ns}; + } + + void set_last_run_ns(int64_t last_run_ns, bool main_task) { + _last_run_ns_and_type = (last_run_ns & LAST_SCHEDULING_TIME_MASK) | + (static_cast<int64_t>(main_task) << 63); + } + int64_t last_run_ns() const { + return _last_run_ns_and_type & LAST_SCHEDULING_TIME_MASK; + } + int64_t last_run_ns_and_type() const { + return _last_run_ns_and_type; + } + + bool is_main_task() const { + return _last_run_ns_and_type & TASK_TYPE_MASK; + } + + void add_cumulated_cputime_ns(int64_t cputime_ns, bool main_task) { + if (main_task) { + return; + } + _cumulated_cputime_ns += cputime_ns; + } + int64_t cumulated_cputime_ns() const { + return _cumulated_cputime_ns; + } + + private: + // The higher bit for task type, main task is 1, otherwise 0. + // Lowest 63 bits for last scheduling time. + int64_t _last_run_ns_and_type; + // Cumulated CPU time in nanoseconds. + int64_t _cumulated_cputime_ns; + }; + + class AtomicCPUTimeStat { + public: + CPUTimeStat load() const { + return _cpu_time_stat.load(); + } + CPUTimeStat load_unsafe() const { + return _cpu_time_stat.load_unsafe(); + } + + void store(CPUTimeStat cpu_time_stat) { + _cpu_time_stat.store(AtomicInteger128::Value(cpu_time_stat)); + } + + private: + AtomicInteger128 _cpu_time_stat; + }; + // You shall use TaskControl::create_group to create new instance. explicit TaskGroup(TaskControl* c); @@ -248,41 +342,43 @@ friend class TaskControl; void set_pl(ParkingLot* pl) { _pl = pl; } - TaskMeta* _cur_meta; + static bool is_main_task(TaskGroup* g, bthread_t tid) { + return g->_main_tid == tid; + } + + TaskMeta* _cur_meta{NULL}; // the control that this group belongs to - TaskControl* _control; - int _num_nosignal; - int _nsignaled; - // last scheduling time - int64_t _last_run_ns; - int64_t _cumulated_cputime_ns; + TaskControl* _control{NULL}; + int _num_nosignal{0}; + int _nsignaled{0}; + AtomicCPUTimeStat _cpu_time_stat; // last thread cpu clock - int64_t _last_cpu_clock_ns; + int64_t _last_cpu_clock_ns{0}; - size_t _nswitch; - RemainedFn _last_context_remained; - void* _last_context_remained_arg; + size_t _nswitch{0}; + RemainedFn _last_context_remained{NULL}; + void* _last_context_remained_arg{NULL}; - ParkingLot* _pl; + ParkingLot* _pl{NULL}; #ifndef BTHREAD_DONT_SAVE_PARKING_STATE ParkingLot::State _last_pl_state; #endif - size_t _steal_seed; - size_t _steal_offset; - ContextualStack* _main_stack; - bthread_t _main_tid; + size_t _steal_seed{butil::fast_rand()}; + size_t _steal_offset{prime_offset(_steal_seed)}; + ContextualStack* _main_stack{NULL}; + bthread_t _main_tid{INVALID_BTHREAD}; WorkStealingQueue<bthread_t> _rq; RemoteTaskQueue _remote_rq; - int _remote_num_nosignal; - int _remote_nsignaled; + int _remote_num_nosignal{0}; + int _remote_nsignaled{0}; - int _sched_recursive_guard; + int _sched_recursive_guard{0}; // tag of this taskgroup - bthread_tag_t _tag; + bthread_tag_t _tag{BTHREAD_TAG_DEFAULT}; // Worker thread id. - pid_t _tid; + pid_t _tid{-1}; }; } // namespace bthread --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org