This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 5b0a6c1b epoll bthread deal first (#2819)
5b0a6c1b is described below
commit 5b0a6c1bdaf1b15974e5d1deb914e9ea025a4de5
Author: Jade <[email protected]>
AuthorDate: Thu May 22 10:14:06 2025 +0800
epoll bthread deal first (#2819)
* add global priority queue
* add parking lot license
* fix scale workers
* fix expected state
* add global priority queue
* fix setconcurrency ut error
* fix rebase error
* add global priority bthread flags
* deal some comments
* fix comments
---------
Co-authored-by: jiazheng.jia <[email protected]>
---
src/brpc/event_dispatcher_epoll.cpp | 7 ++++---
src/bthread/parking_lot.h | 1 +
src/bthread/task_control.cpp | 10 ++++++++++
src/bthread/task_control.h | 5 +++++
src/bthread/task_group.cpp | 18 +++++++++++++++---
src/bthread/task_group.h | 2 ++
src/bthread/task_group_inl.h | 2 +-
src/bthread/types.h | 5 +++++
test/bthread_setconcurrency_unittest.cpp | 1 -
9 files changed, 43 insertions(+), 8 deletions(-)
diff --git a/src/brpc/event_dispatcher_epoll.cpp
b/src/brpc/event_dispatcher_epoll.cpp
index 0ea404ff..005119f7 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -26,7 +26,7 @@ EventDispatcher::EventDispatcher()
: _event_dispatcher_fd(-1)
, _stop(false)
, _tid(0)
- , _thread_attr(BTHREAD_ATTR_NORMAL) {
+ , _thread_attr(BTHREAD_ATTR_EPOLL) {
_event_dispatcher_fd = epoll_create(1024 * 1024);
if (_event_dispatcher_fd < 0) {
PLOG(FATAL) << "Fail to create epoll";
@@ -69,8 +69,9 @@ int EventDispatcher::Start(const bthread_attr_t*
consumer_thread_attr) {
// Set _thread_attr before creating epoll thread to make sure
// everyting seems sane to the thread.
- _thread_attr = consumer_thread_attr ?
- *consumer_thread_attr : BTHREAD_ATTR_NORMAL;
+ if (consumer_thread_attr) {
+ _thread_attr = *consumer_thread_attr | BTHREAD_GLOBAL_PRIORITY;
+ }
//_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it
will cause new bthread
// that created by epoll_wait() never to quit.
diff --git a/src/bthread/parking_lot.h b/src/bthread/parking_lot.h
index 9bb48ad7..620e3c89 100644
--- a/src/bthread/parking_lot.h
+++ b/src/bthread/parking_lot.h
@@ -70,6 +70,7 @@ public:
_pending_signal.fetch_or(1);
futex_wake_private(&_pending_signal, 10000);
}
+
private:
// higher 31 bits for signalling, LSB for stopping.
butil::atomic<int> _pending_signal;
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 545bc322..47b2199b 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -183,6 +183,7 @@ TaskControl::TaskControl()
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
+ , _priority_queues(FLAGS_task_group_ntags)
, _pl(FLAGS_task_group_ntags)
{}
@@ -207,6 +208,10 @@ int TaskControl::init(int concurrency) {
_tagged_worker_usage_second.push_back(new
bvar::PerSecond<bvar::PassiveStatus<double>>(
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i],
1));
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count",
tag_str));
+ if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
+ LOG(FATAL) << "Fail to init _priority_q";
+ return -1;
+ }
}
// Make sure TimerThread is ready.
@@ -429,6 +434,11 @@ int TaskControl::_destroy_group(TaskGroup* g) {
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();
+
+ if (_priority_queues[tag].steal(tid)) {
+ return true;
+ }
+
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
const size_t ngroup =
tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 95820a86..2a2b76d6 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -97,6 +97,10 @@ public:
std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACER
+ void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
+ _priority_queues[tag].push(tid);
+ }
+
private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
static const int PARKING_LOT_NUM = 4;
@@ -153,6 +157,7 @@ private:
std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*>
_tagged_worker_usage_second;
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
+ std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
std::vector<TaggedParkingLot> _pl;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 6abc444b..c18f7507 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -482,12 +482,19 @@ int TaskGroup::start_foreground(TaskGroup** pg,
} else {
// NOSIGNAL affects current task, not the new task.
RemainedFn fn = NULL;
- if (g->current_task()->about_to_quit) {
+ auto& cur_attr = g->_cur_meta->attr;
+ if (cur_attr.flags & BTHREAD_GLOBAL_PRIORITY) {
+ fn = priority_to_run;
+ } else if (g->current_task()->about_to_quit) {
fn = ready_to_run_in_worker_ignoresignal;
} else {
fn = ready_to_run_in_worker;
}
- ReadyToRunArgs args = { g->_cur_meta, (bool)(using_attr.flags &
BTHREAD_NOSIGNAL) };
+ ReadyToRunArgs args = {
+ g->tag(),
+ g->_cur_meta,
+ (bool)(using_attr.flags & BTHREAD_NOSIGNAL)
+ };
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid);
}
@@ -861,6 +868,11 @@ void TaskGroup::ready_to_run_in_worker_ignoresignal(void*
args_in) {
return tls_task_group->push_rq(args->meta->tid);
}
+void TaskGroup::priority_to_run(void* args_in) {
+ ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
+ return tls_task_group->control()->push_priority_queue(args->tag,
args->meta->tid);
+}
+
struct SleepArgs {
uint64_t timeout_us;
bthread_t tid;
@@ -1035,7 +1047,7 @@ int TaskGroup::interrupt(bthread_t tid, TaskControl* c,
bthread_tag_t tag) {
void TaskGroup::yield(TaskGroup** pg) {
TaskGroup* g = *pg;
- ReadyToRunArgs args = { g->_cur_meta, false };
+ ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
g->set_remained(ready_to_run_in_worker, &args);
sched(pg);
}
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index 488ad492..ccba9ba3 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -221,11 +221,13 @@ friend class TaskControl;
static void _release_last_context(void*);
static void _add_sleep_event(void*);
struct ReadyToRunArgs {
+ bthread_tag_t tag;
TaskMeta* meta;
bool nosignal;
};
static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_ignoresignal(void*);
+ static void priority_to_run(void*);
// Wait for a task to run.
// Returns true on success, false is treated as permanent error and the
diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h
index 4842bf69..aec1a284 100644
--- a/src/bthread/task_group_inl.h
+++ b/src/bthread/task_group_inl.h
@@ -51,7 +51,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta*
next_meta) {
if (g->is_current_pthread_task()) {
return g->ready_to_run(next_meta);
}
- ReadyToRunArgs args = { g->_cur_meta, false };
+ ReadyToRunArgs args = { g->tag(), g->_cur_meta, false };
g->set_remained((g->current_task()->about_to_quit
? ready_to_run_in_worker_ignoresignal
: ready_to_run_in_worker),
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 60f5c7fe..c0f23f1c 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -53,6 +53,7 @@ static const bthread_attrflags_t BTHREAD_LOG_CONTEXT_SWITCH =
16;
static const bthread_attrflags_t BTHREAD_NOSIGNAL = 32;
static const bthread_attrflags_t BTHREAD_NEVER_QUIT = 64;
static const bthread_attrflags_t BTHREAD_INHERIT_SPAN = 128;
+static const bthread_attrflags_t BTHREAD_GLOBAL_PRIORITY = 256;
// Key of thread-local data, created by bthread_key_create.
typedef struct {
@@ -137,6 +138,10 @@ static const bthread_attr_t BTHREAD_ATTR_NORMAL =
{BTHREAD_STACKTYPE_NORMAL, 0,
static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0,
NULL,
BTHREAD_TAG_INVALID};
+// epoll bthread
+static const bthread_attr_t BTHREAD_ATTR_EPOLL = {
+ BTHREAD_STACKTYPE_NORMAL, BTHREAD_GLOBAL_PRIORITY, NULL,
BTHREAD_TAG_INVALID};
+
// bthreads created with this attribute will print log when it's started,
// context-switched, finished.
static const bthread_attr_t BTHREAD_ATTR_DEBUG = {
diff --git a/test/bthread_setconcurrency_unittest.cpp
b/test/bthread_setconcurrency_unittest.cpp
index 9e2e50d1..0843918f 100644
--- a/test/bthread_setconcurrency_unittest.cpp
+++ b/test/bthread_setconcurrency_unittest.cpp
@@ -214,7 +214,6 @@ int concurrency_by_tag(int num) {
TEST(BthreadTest, concurrency_by_tag) {
ASSERT_EQ(concurrency_by_tag(1), false);
- auto tag_con = bthread_getconcurrency_by_tag(0);
auto con = bthread_getconcurrency();
ASSERT_EQ(concurrency_by_tag(con), true);
ASSERT_EQ(concurrency_by_tag(con + 1), true);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]