This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new f949962e Support custom ParkingLot number (#3033) f949962e is described below commit f949962e141fecba92bbd399d1da75fe2ac6862d Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Mon Aug 18 10:18:27 2025 +0800 Support custom ParkingLot number (#3033) * Support custom ParkingLot number * Validate bthread_parking_lot_of_each_tag flag * Fast path, no need to futex_wait --- src/bthread/bthread.cpp | 17 ++++++++++++++++- src/bthread/parking_lot.h | 4 ++++ src/bthread/task_control.cpp | 21 ++++++++++----------- src/bthread/task_control.h | 8 ++++---- src/bthread/types.h | 3 +++ 5 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index 35d87477..a5f178e3 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -59,6 +59,21 @@ DEFINE_int32(bthread_concurrency_by_tag, 8 + BTHREAD_EPOLL_THREAD_NUM, "Number of pthread workers of FLAGS_bthread_current_tag"); BUTIL_VALIDATE_GFLAG(bthread_concurrency_by_tag, validate_bthread_concurrency_by_tag); +DEFINE_int32(bthread_parking_lot_of_each_tag, 4, "Number of parking lots of each tag"); +BUTIL_VALIDATE_GFLAG(bthread_parking_lot_of_each_tag, [](const char*, int32_t val) { + if (val < BTHREAD_MIN_PARKINGLOT) { + LOG(ERROR) << "bthread_parking_lot_of_each_tag must be greater than or equal to " + << BTHREAD_MIN_PARKINGLOT; + return false; + } + if (val > BTHREAD_MAX_PARKINGLOT) { + LOG(ERROR) << "bthread_parking_lot_of_each_tag must be less than or equal to " + << BTHREAD_MAX_PARKINGLOT; + return false; + } + return true; +}); + static bool never_set_bthread_concurrency = true; BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>), atomic_size_match); @@ -216,7 +231,7 @@ static bool validate_bthread_current_tag(const char*, int32_t val) { return false; } BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex); - auto c = bthread::get_task_control(); + auto c = get_task_control(); if (c == NULL) { FLAGS_bthread_concurrency_by_tag = 8 + BTHREAD_EPOLL_THREAD_NUM; return true; diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index 620e3c89..315e9956 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -60,6 +60,10 @@ public: // Wait for tasks. // If the `expected_state' does not match, wait() may finish directly. void wait(const State& expected_state) { + if (get_state().val != expected_state.val) { + // Fast path, no need to futex_wait. + return; + } _waiter_num.fetch_add(1, butil::memory_order_relaxed); futex_wait_private(&_pending_signal, expected_state.val, NULL); _waiter_num.fetch_sub(1, butil::memory_order_relaxed); diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 0cc3754a..05ceec09 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -46,6 +46,7 @@ namespace bthread { DECLARE_int32(bthread_concurrency); DECLARE_int32(bthread_min_concurrency); +DECLARE_int32(bthread_parking_lot_of_each_tag); extern pthread_mutex_t g_task_control_mutex; extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; @@ -186,7 +187,8 @@ TaskControl::TaskControl() , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") , _priority_queues(FLAGS_task_group_ntags) - , _pl(FLAGS_task_group_ntags) + , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) + , _tagged_pl(FLAGS_task_group_ntags) {} int TaskControl::init(int concurrency) { @@ -326,7 +328,7 @@ void TaskControl::stop_and_join() { [](butil::atomic<size_t>& index) { index.store(0, butil::memory_order_relaxed); }); } for (int i = 0; i < FLAGS_task_group_ntags; ++i) { - for (auto& pl : _pl[i]) { + for (auto& pl : _tagged_pl[i]) { pl.stop(); } } @@ -367,7 +369,7 @@ int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) { return -1; } g->set_tag(tag); - g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM]); + g->set_pl(&_tagged_pl[tag][butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag]); size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed); if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) { _tagged_groups[tag][ngroup] = g; @@ -482,14 +484,11 @@ void TaskControl::signal_task(int num_task, bthread_tag_t tag) { num_task = 2; } auto& pl = tag_pl(tag); - int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM; - num_task -= pl[start_index].signal(1); - if (num_task > 0) { - for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) { - if (++start_index >= PARKING_LOT_NUM) { - start_index = 0; - } - num_task -= pl[start_index].signal(1); + size_t start_index = butil::fmix64(pthread_numeric_id()) % _pl_num_of_each_tag; + for (size_t i = 0; i < _pl_num_of_each_tag && num_task > 0; ++i) { + num_task -= pl[start_index].signal(1); + if (++start_index >= _pl_num_of_each_tag) { + start_index = 0; } } if (num_task > 0 && diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 11587b29..2426b00c 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -103,8 +103,7 @@ public: private: typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; - static const int PARKING_LOT_NUM = 4; - typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot; + typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot; // Add/Remove a TaskGroup. // Returns 0 on success, -1 otherwise. int _add_group(TaskGroup*, bthread_tag_t tag); @@ -117,7 +116,7 @@ private: butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } // Tag parking slot - TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; } + TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; } static void delete_task_group(void* arg); @@ -159,7 +158,8 @@ private: std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads; std::vector<WorkStealingQueue<bthread_t>> _priority_queues; - std::vector<TaggedParkingLot> _pl; + size_t _pl_num_of_each_tag; + std::vector<TaggedParkingLot> _tagged_pl; #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer; diff --git a/src/bthread/types.h b/src/bthread/types.h index c0f23f1c..30368f68 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -154,6 +154,9 @@ static const bthread_t BTHREAD_ATOMIC_INIT = 0; // Min/Max number of work pthreads. static const int BTHREAD_MIN_CONCURRENCY = 3 + BTHREAD_EPOLL_THREAD_NUM; static const int BTHREAD_MAX_CONCURRENCY = 1024; +// Min/max number of ParkingLot. +static const int BTHREAD_MIN_PARKINGLOT = 4; +static const int BTHREAD_MAX_PARKINGLOT = 1024; typedef struct { void* impl; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org