This is an automated email from the ASF dual-hosted git repository.
chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 2ceefba8 fix(bthread): refactor sharded priority queue with per-ED
shard (#3270)
2ceefba8 is described below
commit 2ceefba8d5afaf19ab003071523965affe5d5c40
Author: Enrico <[email protected]>
AuthorDate: Sun Jun 14 13:46:38 2026 +0800
fix(bthread): refactor sharded priority queue with per-ED shard (#3270)
* fix(bthread): refactor sharded priority queue with per-ED shard
Each EventDispatcher gets its own WorkStealingQueue, making concurrent
push from multiple EDs naturally SPMC-safe without spinlocks.
fix: comments of redundant namespace
fix: ut failure and -1 issue
* fix: comments about priority pq renaming
---------
Co-authored-by: yannan.wyn <[email protected]>
---
src/brpc/event_dispatcher.cpp | 6 +-
src/brpc/event_dispatcher.h | 4 +
src/brpc/event_dispatcher_epoll.cpp | 8 +-
src/bthread/task_control.cpp | 44 ++++--
src/bthread/task_control.h | 18 ++-
src/bthread/task_group.cpp | 9 +-
src/bthread/task_meta.h | 2 +
test/bthread_priority_queue_unittest.cpp | 228 +++++++++++++++++++++++++++++++
8 files changed, 304 insertions(+), 15 deletions(-)
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index d4316bef..15152fc8 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -23,13 +23,14 @@
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
#include "bvar/latency_recorder.h" // bvar::LatencyRecorder
#include "bthread/bthread.h" //
bthread_start_background
+#include "bthread/task_group.h" //
TaskGroup::address_meta
#include "brpc/event_dispatcher.h"
DECLARE_int32(task_group_ntags);
-namespace brpc {
+DECLARE_int32(event_dispatcher_num);
-DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
+namespace brpc {
DEFINE_bool(event_dispatcher_edisp_unsched, false,
"Disable event dispatcher schedule");
@@ -66,6 +67,7 @@ void InitializeGlobalDispatchers() {
bthread_attr_t attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL;
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
+ g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j);
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num +
j].Start(&attr));
}
}
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 3fdc9f17..d95243ac 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -114,6 +114,8 @@ public:
// Stop bthread of this dispatcher.
void Stop();
+ void set_priority_index(int idx) { _priority_index = idx; }
+
// Suspend calling thread until bthread of this dispatcher stops.
void Join();
@@ -188,6 +190,8 @@ private:
// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];
+
+ int _priority_index{-1};
};
EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
diff --git a/src/brpc/event_dispatcher_epoll.cpp
b/src/brpc/event_dispatcher_epoll.cpp
index 5a6c23b0..31f60aac 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) {
}
void* EventDispatcher::RunThis(void* arg) {
- ((EventDispatcher*)arg)->Run();
+ EventDispatcher* ed = (EventDispatcher*)arg;
+ if (ed->_priority_index >= 0) {
+ bthread::TaskMeta* meta =
+ bthread::TaskGroup::address_meta(bthread_self());
+ meta->priority_index = ed->_priority_index;
+ }
+ ed->Run();
return NULL;
}
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index ba067e39..347dbd24 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "",
"Set of CPUs to which cores are bound. "
"for example, 0-3,5,7; default: disable");
+DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
+
namespace bthread {
DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
@@ -205,11 +207,31 @@ TaskControl::TaskControl()
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
- , _priority_queues(FLAGS_task_group_ntags)
+ , _ed_priority_queue_num_of_each_tag(FLAGS_event_dispatcher_num)
+ , _ed_priority_queues(
+ FLAGS_task_group_ntags * FLAGS_event_dispatcher_num)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
{}
+int TaskControl::init_ed_priority_queues() {
+ if (!_enable_priority_queue) {
+ return 0;
+ }
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ for (int j = 0;
+ j < _ed_priority_queue_num_of_each_tag; ++j) {
+ if (ed_priority_queue(i, j).init(
+ BTHREAD_MAX_CONCURRENCY) != 0) {
+ LOG(ERROR) << "Fail to init priority queue for tag=" << i
+ << " ed=" << j;
+ return -1;
+ }
+ }
+ }
+ return 0;
+}
+
int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
@@ -238,10 +260,10 @@ int TaskControl::init(int concurrency) {
_tagged_worker_usage_second.push_back(new
bvar::PerSecond<bvar::PassiveStatus<double>>(
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i],
1));
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count",
tag_str));
- if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
- LOG(ERROR) << "Fail to init _priority_q";
- return -1;
- }
+ }
+
+ if (init_ed_priority_queues() != 0) {
+ return -1;
}
// Make sure TimerThread is ready.
@@ -445,7 +467,7 @@ TaskControl::~TaskControl() {
_switch_per_second.hide();
_signal_per_second.hide();
_status.hide();
-
+
stop_and_join();
}
@@ -528,8 +550,13 @@ int TaskControl::_destroy_group(TaskGroup* g) {
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();
- if (_priority_queues[tag].steal(tid)) {
- return true;
+ if (_enable_priority_queue) {
+ for (int i = 0;
+ i < _ed_priority_queue_num_of_each_tag; ++i) {
+ if (ed_priority_queue(tag, i).steal(tid)) {
+ return true;
+ }
+ }
}
// 1: Acquiring fence is paired with releasing fence in _add_group to
@@ -689,4 +716,5 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() {
return living_bthread_ids;
}
+
} // namespace bthread
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 4480daa6..93336199 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -101,11 +101,13 @@ public:
std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACER
- void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
- _priority_queues[tag].push(tid);
+ void push_ed_priority_queue(
+ bthread_tag_t tag, int priority_index, bthread_t tid) {
+ ed_priority_queue(tag, priority_index).push(tid);
}
std::vector<bthread_t> get_living_bthreads();
+
private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
@@ -123,6 +125,15 @@ private:
// Tag parking slot
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
+ // Priority queue for a specific ED within a tag
+ WorkStealingQueue<bthread_t>& ed_priority_queue(
+ bthread_tag_t tag, int index) {
+ return _ed_priority_queues[
+ tag * _ed_priority_queue_num_of_each_tag + index];
+ }
+
+ int init_ed_priority_queues();
+
static void delete_task_group(void* arg);
static void* worker_thread(void* task_control);
@@ -164,7 +175,8 @@ private:
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
bool _enable_priority_queue;
- std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
+ int _ed_priority_queue_num_of_each_tag;
+ std::vector<WorkStealingQueue<bthread_t>> _ed_priority_queues;
size_t _pl_num_of_each_tag;
std::vector<TaggedParkingLot> _tagged_pl;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 4706b7f7..c0804c9a 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -526,6 +526,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
+ m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1;
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
@@ -595,6 +596,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
+ m->priority_index = _cur_meta->priority_index;
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
@@ -674,6 +676,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.
+
#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
@@ -942,7 +945,11 @@ void TaskGroup::priority_to_run(void* args_in) {
tls_task_group->_control->_task_tracer.set_status(
TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
- return tls_task_group->control()->push_priority_queue(args->tag,
args->meta->tid);
+ if (args->meta->priority_index < 0) {
+ return tls_task_group->push_rq(args->meta->tid);
+ }
+ return tls_task_group->control()->push_ed_priority_queue(
+ args->tag, args->meta->priority_index, args->meta->tid);
}
struct SleepArgs {
diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h
index a2490b45..074af430 100644
--- a/src/bthread/task_meta.h
+++ b/src/bthread/task_meta.h
@@ -91,6 +91,8 @@ struct TaskMeta {
// simplified if they can get tid from TaskMeta.
bthread_t tid{INVALID_BTHREAD};
+ int priority_index{-1};
+
// User function and argument
void* (*fn)(void*){NULL};
void* arg{NULL};
diff --git a/test/bthread_priority_queue_unittest.cpp
b/test/bthread_priority_queue_unittest.cpp
new file mode 100644
index 00000000..d6c9e43a
--- /dev/null
+++ b/test/bthread_priority_queue_unittest.cpp
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <atomic>
+#include <vector>
+#include <set>
+#include <mutex>
+#include <sched.h>
+#include "bthread/bthread.h"
+#include "bthread/task_group.h"
+
+namespace {
+
+// Counter incremented by priority bthreads to verify execution
+std::atomic<int> g_priority_count(0);
+// Mutex + set for collecting executed tids to verify no loss
+std::mutex g_tid_mutex;
+std::set<int> g_executed_ids;
+
+void reset_globals() {
+ g_priority_count.store(0);
+ std::lock_guard<std::mutex> lk(g_tid_mutex);
+ g_executed_ids.clear();
+}
+
+struct TaskArg {
+ int id;
+};
+
+void* priority_task_fn(void* arg) {
+ TaskArg* ta = static_cast<TaskArg*>(arg);
+ g_priority_count.fetch_add(1, std::memory_order_relaxed);
+ {
+ std::lock_guard<std::mutex> lk(g_tid_mutex);
+ g_executed_ids.insert(ta->id);
+ }
+ delete ta;
+ return NULL;
+}
+
+void* normal_task_fn(void* /*arg*/) {
+ // Just a normal task that does nothing, used as a filler
+ bthread_usleep(1000);
+ return NULL;
+}
+
+class PriorityQueueTest : public ::testing::Test {
+protected:
+ static void SetUpTestSuite() {
+ google::SetCommandLineOption("enable_bthread_priority_queue", "true");
+ google::SetCommandLineOption("event_dispatcher_num", "4");
+ }
+ void SetUp() override {
+ reset_globals();
+ }
+};
+
+// Test 1: End-to-end priority task submission and execution.
+// Multiple producers submit priority tasks, verify all tasks are executed.
+TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) {
+ const int N = 200;
+
+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+ attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+ std::vector<bthread_t> tids(N);
+ for (int i = 0; i < N; ++i) {
+ TaskArg* arg = new TaskArg{i};
+ ASSERT_EQ(0, bthread_start_background(&tids[i], &attr,
+ priority_task_fn, arg));
+ }
+
+ for (int i = 0; i < N; ++i) {
+ bthread_join(tids[i], NULL);
+ }
+
+ ASSERT_EQ(N, g_priority_count.load());
+ std::lock_guard<std::mutex> lk(g_tid_mutex);
+ ASSERT_EQ((size_t)N, g_executed_ids.size());
+ for (int i = 0; i < N; ++i) {
+ ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i;
+ }
+}
+
+// Test 2: Mix of priority and normal tasks, all complete correctly.
+TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) {
+ const int N_PRIORITY = 100;
+ const int N_NORMAL = 100;
+
+ bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL;
+ priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+ std::vector<bthread_t> tids;
+ tids.reserve(N_PRIORITY + N_NORMAL);
+
+ for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) {
+ bthread_t tid;
+ if (i % 2 == 0 && (i / 2) < N_PRIORITY) {
+ TaskArg* arg = new TaskArg{i / 2};
+ ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr,
+ priority_task_fn, arg));
+ } else {
+ ASSERT_EQ(0, bthread_start_background(&tid, NULL,
+ normal_task_fn, NULL));
+ }
+ tids.push_back(tid);
+ }
+
+ for (auto tid : tids) {
+ bthread_join(tid, NULL);
+ }
+
+ ASSERT_EQ(N_PRIORITY, g_priority_count.load());
+}
+
+// Test 3: start_foreground (bthread_start_urgent) with GLOBAL_PRIORITY.
+// Simulates ED calling StartInputEvent: ED bthread calls start_urgent,
+// gets preempted into PQ via priority_to_run, child runs and ends,
+// ending_sched steals ED from PQ to resume.
+TEST_F(PriorityQueueTest, start_foreground_priority_to_run) {
+ const int N = 200;
+
+ struct EDSimArg {
+ int n_tasks;
+ };
+ EDSimArg ed_arg{N};
+
+ auto ed_fn = [](void* arg) -> void* {
+ EDSimArg* ea = static_cast<EDSimArg*>(arg);
+ bthread::TaskMeta* meta =
+ bthread::TaskGroup::address_meta(bthread_self());
+ meta->priority_index = 0;
+
+ for (int i = 0; i < ea->n_tasks; ++i) {
+ TaskArg* ta = new TaskArg{i};
+ bthread_t child;
+ bthread_start_urgent(&child, NULL, priority_task_fn, ta);
+ }
+ return NULL;
+ };
+
+ bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL;
+ priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+ bthread_t ed_tid;
+ ASSERT_EQ(0, bthread_start_background(&ed_tid, &priority_attr,
+ ed_fn, &ed_arg));
+ bthread_join(ed_tid, NULL);
+
+ ASSERT_EQ(N, g_priority_count.load());
+ std::lock_guard<std::mutex> lk(g_tid_mutex);
+ ASSERT_EQ((size_t)N, g_executed_ids.size());
+}
+
+// Test 4: Multiple ED-like bthreads concurrently calling start_urgent.
+// Verifies PQ correctness under concurrent preemption from multiple EDs.
+TEST_F(PriorityQueueTest, multiple_eds_concurrent_preempt) {
+ const int NUM_EDS = 4;
+ const int TASKS_PER_ED = 50;
+ const int TOTAL = NUM_EDS * TASKS_PER_ED;
+ std::atomic<int> resume_count(0);
+
+ struct EDArg {
+ int ed_index;
+ int n_children;
+ std::atomic<int>* resume_count;
+ };
+
+ auto ed_fn = [](void* arg) -> void* {
+ EDArg* ea = static_cast<EDArg*>(arg);
+ bthread::TaskMeta* meta =
+ bthread::TaskGroup::address_meta(bthread_self());
+ meta->priority_index = ea->ed_index;
+
+ std::vector<bthread_t> children;
+ children.reserve(ea->n_children);
+ for (int i = 0; i < ea->n_children; ++i) {
+ int id = ea->ed_index * ea->n_children + i;
+ TaskArg* ta = new TaskArg{id};
+ bthread_t child;
+ bthread_start_urgent(&child, NULL, priority_task_fn, ta);
+ children.push_back(child);
+ ea->resume_count->fetch_add(1, std::memory_order_relaxed);
+ }
+ for (auto c : children) {
+ bthread_join(c, NULL);
+ }
+ return NULL;
+ };
+
+ bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL;
+ priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+ std::vector<EDArg> ed_args(NUM_EDS);
+ std::vector<bthread_t> ed_tids(NUM_EDS);
+ for (int i = 0; i < NUM_EDS; ++i) {
+ ed_args[i] = {i, TASKS_PER_ED, &resume_count};
+ ASSERT_EQ(0, bthread_start_background(&ed_tids[i], &priority_attr,
+ ed_fn, &ed_args[i]));
+ }
+
+ for (int i = 0; i < NUM_EDS; ++i) {
+ bthread_join(ed_tids[i], NULL);
+ }
+
+ ASSERT_EQ(TOTAL, g_priority_count.load());
+ ASSERT_EQ(TOTAL, resume_count.load());
+ std::lock_guard<std::mutex> lk(g_tid_mutex);
+ ASSERT_EQ((size_t)TOTAL, g_executed_ids.size());
+}
+
+} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]