This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 5b0a6c1b epoll bthread deal first (#2819) 5b0a6c1b is described below commit 5b0a6c1bdaf1b15974e5d1deb914e9ea025a4de5 Author: Jade <zhengj...@qq.com> AuthorDate: Thu May 22 10:14:06 2025 +0800 epoll bthread deal first (#2819) * add global priority queue * add parking lot license * fix scale workers * fix expected state * add global priority queue * fix setconcurrency ut error * fix rebase error * add global priority bthread flags * deal some comments * fix comments --------- Co-authored-by: jiazheng.jia <jiazheng....@antgroup.com> --- src/brpc/event_dispatcher_epoll.cpp | 7 ++++--- src/bthread/parking_lot.h | 1 + src/bthread/task_control.cpp | 10 ++++++++++ src/bthread/task_control.h | 5 +++++ src/bthread/task_group.cpp | 18 +++++++++++++++--- src/bthread/task_group.h | 2 ++ src/bthread/task_group_inl.h | 2 +- src/bthread/types.h | 5 +++++ test/bthread_setconcurrency_unittest.cpp | 1 - 9 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 0ea404ff..005119f7 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher() : _event_dispatcher_fd(-1) , _stop(false) , _tid(0) - , _thread_attr(BTHREAD_ATTR_NORMAL) { + , _thread_attr(BTHREAD_ATTR_EPOLL) { _event_dispatcher_fd = epoll_create(1024 * 1024); if (_event_dispatcher_fd < 0) { PLOG(FATAL) << "Fail to create epoll"; @@ -69,8 +69,9 @@ int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) { // Set _thread_attr before creating epoll thread to make sure // everyting seems sane to the thread. - _thread_attr = consumer_thread_attr ? - *consumer_thread_attr : BTHREAD_ATTR_NORMAL; + if (consumer_thread_attr) { + _thread_attr = *consumer_thread_attr | BTHREAD_GLOBAL_PRIORITY; + } //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it will cause new bthread // that created by epoll_wait() never to quit. diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h index 9bb48ad7..620e3c89 100644 --- a/src/bthread/parking_lot.h +++ b/src/bthread/parking_lot.h @@ -70,6 +70,7 @@ public: _pending_signal.fetch_or(1); futex_wake_private(&_pending_signal, 10000); } + private: // higher 31 bits for signalling, LSB for stopping. butil::atomic<int> _pending_signal; diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index 545bc322..47b2199b 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -183,6 +183,7 @@ TaskControl::TaskControl() , _signal_per_second(&_cumulated_signal_count) , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") + , _priority_queues(FLAGS_task_group_ntags) , _pl(FLAGS_task_group_ntags) {} @@ -207,6 +208,10 @@ int TaskControl::init(int concurrency) { _tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>( "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); _tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str)); + if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { + LOG(FATAL) << "Fail to init _priority_q"; + return -1; + } } // Make sure TimerThread is ready. @@ -429,6 +434,11 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); + + if (_priority_queues[tag].steal(tid)) { + return true; + } + // 1: Acquiring fence is paired with releasing fence in _add_group to // avoid accessing uninitialized slot of _groups. const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_acquire/*1*/); diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 95820a86..2a2b76d6 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -97,6 +97,10 @@ public: std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER + void push_priority_queue(bthread_tag_t tag, bthread_t tid) { + _priority_queues[tag].push(tid); + } + private: typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; static const int PARKING_LOT_NUM = 4; @@ -153,6 +157,7 @@ private: std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time; std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second; std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads; + std::vector<WorkStealingQueue<bthread_t>> _priority_queues; std::vector<TaggedParkingLot> _pl; diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 6abc444b..c18f7507 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -482,12 +482,19 @@ int TaskGroup::start_foreground(TaskGroup** pg, } else { // NOSIGNAL affects current task, not the new task. RemainedFn fn = NULL; - if (g->current_task()->about_to_quit) { + auto& cur_attr = g->_cur_meta->attr; + if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) { + fn = priority_to_run; + } else if (g->current_task()->about_to_quit) { fn = ready_to_run_in_worker_ignoresignal; } else { fn = ready_to_run_in_worker; } - ReadyToRunArgs args = { g->_cur_meta, (bool)(using_attr.flags & BTHREAD_NOSIGNAL) }; + ReadyToRunArgs args = { + g->tag(), + g->_cur_meta, + (bool)(using_attr.flags & BTHREAD_NOSIGNAL) + }; g->set_remained(fn, &args); TaskGroup::sched_to(pg, m->tid); } @@ -861,6 +868,11 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) { return tls_task_group->push_rq(args->meta->tid); } +void TaskGroup::priority_to_run(void* args_in) { + ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in); + return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid); +} + struct SleepArgs { uint64_t timeout_us; bthread_t tid; @@ -1035,7 +1047,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, bthread_tag_t tag) { void TaskGroup::yield(TaskGroup** pg) { TaskGroup* g = *pg; - ReadyToRunArgs args = { g->_cur_meta, false }; + ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; g->set_remained(ready_to_run_in_worker, &args); sched(pg); } diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h index 488ad492..ccba9ba3 100644 --- a/src/bthread/task_group.h +++ b/src/bthread/task_group.h @@ -221,11 +221,13 @@ friend class TaskControl; static void _release_last_context(void*); static void _add_sleep_event(void*); struct ReadyToRunArgs { + bthread_tag_t tag; TaskMeta* meta; bool nosignal; }; static void ready_to_run_in_worker(void*); static void ready_to_run_in_worker_ignoresignal(void*); + static void priority_to_run(void*); // Wait for a task to run. // Returns true on success, false is treated as permanent error and the diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h index 4842bf69..aec1a284 100644 --- a/src/bthread/task_group_inl.h +++ b/src/bthread/task_group_inl.h @@ -51,7 +51,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* next_meta) { if (g->is_current_pthread_task()) { return g->ready_to_run(next_meta); } - ReadyToRunArgs args = { g->_cur_meta, false }; + ReadyToRunArgs args = { g->tag(), g->_cur_meta, false }; g->set_remained((g->current_task()->about_to_quit ? ready_to_run_in_worker_ignoresignal : ready_to_run_in_worker), diff --git a/src/bthread/types.h b/src/bthread/types.h index 60f5c7fe..c0f23f1c 100644 --- a/src/bthread/types.h +++ b/src/bthread/types.h @@ -53,6 +53,7 @@ static const bthread_attrflags_t BTHREAD_LOG_CONTEXT_SWITCH = 16; static const bthread_attrflags_t BTHREAD_NOSIGNAL = 32; static const bthread_attrflags_t BTHREAD_NEVER_QUIT = 64; static const bthread_attrflags_t BTHREAD_INHERIT_SPAN = 128; +static const bthread_attrflags_t BTHREAD_GLOBAL_PRIORITY = 256; // Key of thread-local data, created by bthread_key_create. typedef struct { @@ -137,6 +138,10 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL, 0, static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, NULL, BTHREAD_TAG_INVALID}; +// epoll bthread +static const bthread_attr_t BTHREAD_ATTR_EPOLL = { + BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL, BTHREAD_TAG_INVALID}; + // bthreads created with this attribute will print log when it's started, // context-switched, finished. static const bthread_attr_t BTHREAD_ATTR_DEBUG = { diff --git a/test/bthread_setconcurrency_unittest.cpp b/test/bthread_setconcurrency_unittest.cpp index 9e2e50d1..0843918f 100644 --- a/test/bthread_setconcurrency_unittest.cpp +++ b/test/bthread_setconcurrency_unittest.cpp @@ -214,7 +214,6 @@ int concurrency_by_tag(int num) { TEST(BthreadTest, concurrency_by_tag) { ASSERT_EQ(concurrency_by_tag(1), false); - auto tag_con = bthread_getconcurrency_by_tag(0); auto con = bthread_getconcurrency(); ASSERT_EQ(concurrency_by_tag(con), true); ASSERT_EQ(concurrency_by_tag(con + 1), true); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org