This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b0a6c1b epoll bthread deal first (#2819)
5b0a6c1b is described below

commit 5b0a6c1bdaf1b15974e5d1deb914e9ea025a4de5
Author: Jade <zhengj...@qq.com>
AuthorDate: Thu May 22 10:14:06 2025 +0800

    epoll bthread deal first (#2819)
    
    * add global priority queue
    
    * add parking lot license
    
    * fix scale workers
    
    * fix expected state
    
    * add global priority queue
    
    * fix setconcurrency ut error
    
    * fix rebase error
    
    * add global priority bthread flags
    
    * deal some comments
    
    * fix comments
    
    ---------
    
    Co-authored-by: jiazheng.jia <jiazheng....@antgroup.com>
---
 src/brpc/event_dispatcher_epoll.cpp      |  7 ++++---
 src/bthread/parking_lot.h                |  1 +
 src/bthread/task_control.cpp             | 10 ++++++++++
 src/bthread/task_control.h               |  5 +++++
 src/bthread/task_group.cpp               | 18 +++++++++++++++---
 src/bthread/task_group.h                 |  2 ++
 src/bthread/task_group_inl.h             |  2 +-
 src/bthread/types.h                      |  5 +++++
 test/bthread_setconcurrency_unittest.cpp |  1 -
 9 files changed, 43 insertions(+), 8 deletions(-)

diff --git a/src/brpc/event_dispatcher_epoll.cpp 
b/src/brpc/event_dispatcher_epoll.cpp
index 0ea404ff..005119f7 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher()
     : _event_dispatcher_fd(-1)
     , _stop(false)
     , _tid(0)
-    , _thread_attr(BTHREAD_ATTR_NORMAL) {
+    , _thread_attr(BTHREAD_ATTR_EPOLL) {
     _event_dispatcher_fd = epoll_create(1024 * 1024);
     if (_event_dispatcher_fd < 0) {
         PLOG(FATAL) << "Fail to create epoll";
@@ -69,8 +69,9 @@ int EventDispatcher::Start(const bthread_attr_t* 
consumer_thread_attr) {
 
     // Set _thread_attr before creating epoll thread to make sure
     // everyting seems sane to the thread.
-    _thread_attr = consumer_thread_attr  ?
-        *consumer_thread_attr : BTHREAD_ATTR_NORMAL;
+    if (consumer_thread_attr) {
+        _thread_attr = *consumer_thread_attr | BTHREAD_GLOBAL_PRIORITY;
+    }
 
     //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it 
will cause new bthread
     // that created by epoll_wait() never to quit.
diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h
index 9bb48ad7..620e3c89 100644
--- a/src/bthread/parking_lot.h
+++ b/src/bthread/parking_lot.h
@@ -70,6 +70,7 @@ public:
         _pending_signal.fetch_or(1);
         futex_wake_private(&_pending_signal, 10000);
     }
+
 private:
     // higher 31 bits for signalling, LSB for stopping.
     butil::atomic<int> _pending_signal;
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 545bc322..47b2199b 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -183,6 +183,7 @@ TaskControl::TaskControl()
     , _signal_per_second(&_cumulated_signal_count)
     , _status(print_rq_sizes_in_the_tc, this)
     , _nbthreads("bthread_count")
+    , _priority_queues(FLAGS_task_group_ntags)
     , _pl(FLAGS_task_group_ntags)
 {}
 
@@ -207,6 +208,10 @@ int TaskControl::init(int concurrency) {
         _tagged_worker_usage_second.push_back(new 
bvar::PerSecond<bvar::PassiveStatus<double>>(
             "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 
1));
         _tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", 
tag_str));
+        if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
+            LOG(FATAL) << "Fail to init _priority_q";
+            return -1;
+        }
     }
 
     // Make sure TimerThread is ready.
@@ -429,6 +434,11 @@ int TaskControl::_destroy_group(TaskGroup* g) {
 
 bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
     auto tag = tls_task_group->tag();
+
+    if (_priority_queues[tag].steal(tid)) {
+        return true;
+    }
+
     // 1: Acquiring fence is paired with releasing fence in _add_group to
     // avoid accessing uninitialized slot of _groups.
     const size_t ngroup = 
tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 95820a86..2a2b76d6 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -97,6 +97,10 @@ public:
     std::string stack_trace(bthread_t tid);
 #endif // BRPC_BTHREAD_TRACER
 
+    void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
+        _priority_queues[tag].push(tid);
+    }
+
 private:
     typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
     static const int PARKING_LOT_NUM = 4;
@@ -153,6 +157,7 @@ private:
     std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
     std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> 
_tagged_worker_usage_second;
     std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
+    std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
 
     std::vector<TaggedParkingLot> _pl;
 
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 6abc444b..c18f7507 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -482,12 +482,19 @@ int TaskGroup::start_foreground(TaskGroup** pg,
     } else {
         // NOSIGNAL affects current task, not the new task.
         RemainedFn fn = NULL;
-        if (g->current_task()->about_to_quit) {
+        auto& cur_attr = g->_cur_meta->attr;
+        if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) {
+            fn = priority_to_run;
+        } else if (g->current_task()->about_to_quit) {
             fn = ready_to_run_in_worker_ignoresignal;
         } else {
             fn = ready_to_run_in_worker;
         }
-        ReadyToRunArgs args = { g->_cur_meta, (bool)(using_attr.flags & 
BTHREAD_NOSIGNAL) };
+        ReadyToRunArgs args = {
+            g->tag(),
+            g->_cur_meta,
+            (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
+        };
         g->set_remained(fn, &args);
         TaskGroup::sched_to(pg, m->tid);
     }
@@ -861,6 +868,11 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void* 
args_in) {
     return tls_task_group->push_rq(args->meta->tid);
 }
 
+void TaskGroup::priority_to_run(void* args_in) {
+    ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
+    return tls_task_group->control()->push_priority_queue(args->tag, 
args->meta->tid);
+}
+
 struct SleepArgs {
     uint64_t timeout_us;
     bthread_t tid;
@@ -1035,7 +1047,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c, 
bthread_tag_t tag) {
 
 void TaskGroup::yield(TaskGroup** pg) {
     TaskGroup* g = *pg;
-    ReadyToRunArgs args = {  g->_cur_meta, false };
+    ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
     g->set_remained(ready_to_run_in_worker, &args);
     sched(pg);
 }
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index 488ad492..ccba9ba3 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -221,11 +221,13 @@ friend class TaskControl;
     static void _release_last_context(void*);
     static void _add_sleep_event(void*);
     struct ReadyToRunArgs {
+        bthread_tag_t tag;
         TaskMeta* meta;
         bool nosignal;
     };
     static void ready_to_run_in_worker(void*);
     static void ready_to_run_in_worker_ignoresignal(void*);
+    static void priority_to_run(void*);
 
     // Wait for a task to run.
     // Returns true on success, false is treated as permanent error and the
diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h
index 4842bf69..aec1a284 100644
--- a/src/bthread/task_group_inl.h
+++ b/src/bthread/task_group_inl.h
@@ -51,7 +51,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* 
next_meta) {
     if (g->is_current_pthread_task()) {
         return g->ready_to_run(next_meta);
     }
-    ReadyToRunArgs args = { g->_cur_meta, false };
+    ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
     g->set_remained((g->current_task()->about_to_quit
                      ? ready_to_run_in_worker_ignoresignal
                      : ready_to_run_in_worker),
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 60f5c7fe..c0f23f1c 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -53,6 +53,7 @@ static const bthread_attrflags_t BTHREAD_LOG_CONTEXT_SWITCH = 
16;
 static const bthread_attrflags_t BTHREAD_NOSIGNAL = 32;
 static const bthread_attrflags_t BTHREAD_NEVER_QUIT = 64;
 static const bthread_attrflags_t BTHREAD_INHERIT_SPAN = 128;
+static const bthread_attrflags_t BTHREAD_GLOBAL_PRIORITY = 256;
 
 // Key of thread-local data, created by bthread_key_create.
 typedef struct {
@@ -137,6 +138,10 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL = 
{BTHREAD_STACKTYPE_NORMAL, 0,
 static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0, 
NULL,
                                                   BTHREAD_TAG_INVALID};
 
+// epoll bthread
+static const bthread_attr_t BTHREAD_ATTR_EPOLL = {
+    BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL, 
BTHREAD_TAG_INVALID};
+
 // bthreads created with this attribute will print log when it's started,
 // context-switched, finished.
 static const bthread_attr_t BTHREAD_ATTR_DEBUG = {
diff --git a/test/bthread_setconcurrency_unittest.cpp 
b/test/bthread_setconcurrency_unittest.cpp
index 9e2e50d1..0843918f 100644
--- a/test/bthread_setconcurrency_unittest.cpp
+++ b/test/bthread_setconcurrency_unittest.cpp
@@ -214,7 +214,6 @@ int concurrency_by_tag(int num) {
 
 TEST(BthreadTest, concurrency_by_tag) {
     ASSERT_EQ(concurrency_by_tag(1), false);
-    auto tag_con = bthread_getconcurrency_by_tag(0);
     auto con = bthread_getconcurrency();
     ASSERT_EQ(concurrency_by_tag(con), true);
     ASSERT_EQ(concurrency_by_tag(con + 1), true);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to