This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 018ce775 Add flag for bthread priority queue, the default is false 
(#3078)
018ce775 is described below

commit 018ce775f2914f35b74436107f0d647538757e16
Author: Bright Chen <[email protected]>
AuthorDate: Mon Sep 1 14:09:51 2025 +0800

    Add flag for bthread priority queue, the default is false (#3078)
    
    * Add flag for bthread priority queue, the default is false
    
    * Only EventDispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
---
 src/brpc/event_dispatcher.h          |  2 +-
 src/brpc/event_dispatcher_epoll.cpp  | 12 +++++++-----
 src/brpc/event_dispatcher_kqueue.cpp |  8 ++++++--
 src/brpc/socket.cpp                  |  2 --
 src/bthread/task_control.cpp         |  3 +++
 src/bthread/task_control.h           |  2 ++
 src/bthread/task_group.cpp           |  2 +-
 src/bthread/types.h                  |  4 ----
 8 files changed, 20 insertions(+), 15 deletions(-)

diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 51c404e2..fd91d3c5 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -99,7 +99,7 @@ public:
     virtual ~EventDispatcher();
 
     // Start this dispatcher in a bthread.
-    // Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
+    // Use |*thread_attr| (if it's not NULL) as the attribute to
     // create bthreads running user callbacks.
     // Returns 0 on success, -1 otherwise.
     virtual int Start(const bthread_attr_t* thread_attr);
diff --git a/src/brpc/event_dispatcher_epoll.cpp 
b/src/brpc/event_dispatcher_epoll.cpp
index 005119f7..5a4a6370 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher()
     : _event_dispatcher_fd(-1)
     , _stop(false)
     , _tid(0)
-    , _thread_attr(BTHREAD_ATTR_EPOLL) {
+    , _thread_attr(BTHREAD_ATTR_NORMAL) {
     _event_dispatcher_fd = epoll_create(1024 * 1024);
     if (_event_dispatcher_fd < 0) {
         PLOG(FATAL) << "Fail to create epoll";
@@ -55,7 +55,7 @@ EventDispatcher::~EventDispatcher() {
     }
 }
 
-int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
+int EventDispatcher::Start(const bthread_attr_t* thread_attr) {
     if (_event_dispatcher_fd < 0) {
         LOG(FATAL) << "epoll was not created";
         return -1;
@@ -69,13 +69,15 @@ int EventDispatcher::Start(const bthread_attr_t* 
consumer_thread_attr) {
 
     // Set _thread_attr before creating epoll thread to make sure
     // everyting seems sane to the thread.
-    if (consumer_thread_attr) {
-        _thread_attr = *consumer_thread_attr | BTHREAD_GLOBAL_PRIORITY;
+    if (thread_attr) {
+        _thread_attr = *thread_attr;
     }
 
     //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it 
will cause new bthread
     // that created by epoll_wait() never to quit.
-    bthread_attr_t epoll_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
+    // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY.
+    bthread_attr_t epoll_thread_attr =
+        _thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY;
 
     // Polling thread uses the same attr for consumer threads (NORMAL right
     // now). Previously, we used small stack (32KB) which may be overflowed
diff --git a/src/brpc/event_dispatcher_kqueue.cpp 
b/src/brpc/event_dispatcher_kqueue.cpp
index a1790486..48b28147 100644
--- a/src/brpc/event_dispatcher_kqueue.cpp
+++ b/src/brpc/event_dispatcher_kqueue.cpp
@@ -69,11 +69,15 @@ int EventDispatcher::Start(const bthread_attr_t* 
thread_attr) {
 
     // Set _thread_attr before creating kqueue thread to make sure
     // everyting seems sane to the thread.
-    _thread_attr = (thread_attr ? *thread_attr : BTHREAD_ATTR_NORMAL);
+    if (thread_attr) {
+        _thread_attr = *thread_attr;
+    }
 
     //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it 
will cause new bthread
     // that created by kevent() never to quit.
-    bthread_attr_t kqueue_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
+    // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY.
+    bthread_attr_t kqueue_thread_attr =
+        _thread_attr | BTHREAD_NEVER_QUIT | BTHREAD_GLOBAL_PRIORITY;
 
     // Polling thread uses the same attr for consumer threads (NORMAL right
     // now). Previously, we used small stack (32KB) which may be overflowed
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 075e03ac..6de247c3 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -2261,8 +2261,6 @@ int Socket::OnInputEvent(void* user_data, uint32_t events,
         bthread_attr_t attr = thread_attr;
         attr.keytable_pool = p->_keytable_pool;
         attr.tag = bthread_self_tag();
-        // Only event dispatcher thread has flag BTHREAD_GLOBAL_PRIORITY
-        attr.flags = attr.flags & (~BTHREAD_GLOBAL_PRIORITY);
         if (FLAGS_usercode_in_coroutine) {
             ProcessEvent(p);
 #if BRPC_WITH_RDMA
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 1bdcf0c0..7d121349 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -45,6 +45,8 @@ DEFINE_bool(task_group_set_worker_name, true,
 
 namespace bthread {
 
+DEFINE_bool(enable_bthread_priority_queue, false, "Whether to enable priority 
queue");
+
 DECLARE_int32(bthread_concurrency);
 DECLARE_int32(bthread_min_concurrency);
 DECLARE_int32(bthread_parking_lot_of_each_tag);
@@ -187,6 +189,7 @@ TaskControl::TaskControl()
     , _signal_per_second(&_cumulated_signal_count)
     , _status(print_rq_sizes_in_the_tc, this)
     , _nbthreads("bthread_count")
+    , _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
     , _priority_queues(FLAGS_task_group_ntags)
     , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
     , _tagged_pl(FLAGS_task_group_ntags)
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 2426b00c..4d666025 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -156,6 +156,8 @@ private:
     std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
     std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> 
_tagged_worker_usage_second;
     std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
+
+    bool _enable_priority_queue;
     std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
 
     size_t _pl_num_of_each_tag;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 773a442b..67f029a0 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -515,7 +515,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
         // NOSIGNAL affects current task, not the new task.
         RemainedFn fn = NULL;
         auto& cur_attr = g->_cur_meta->attr;
-        if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) {
+        if (g->_control->_enable_priority_queue && cur_attr.flags & 
BTHREAD_GLOBAL_PRIORITY) {
             fn = priority_to_run;
         } else if (g->current_task()->about_to_quit) {
             fn = ready_to_run_in_worker_ignoresignal;
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 30368f68..a09c2e38 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -138,10 +138,6 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL = 
{BTHREAD_STACKTYPE_NORMAL, 0,
 static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, 
NULL,
                                                   BTHREAD_TAG_INVALID};
 
-// epoll bthread
-static const bthread_attr_t BTHREAD_ATTR_EPOLL = {
-    BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL, 
BTHREAD_TAG_INVALID};
-
 // bthreads created with this attribute will print log when it's started,
 // context-switched, finished.
 static const bthread_attr_t BTHREAD_ATTR_DEBUG = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to