This is an automated email from the ASF dual-hosted git repository.

chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ceefba8 fix(bthread): refactor sharded priority queue with per-ED 
shard (#3270)
2ceefba8 is described below

commit 2ceefba8d5afaf19ab003071523965affe5d5c40
Author: Enrico <[email protected]>
AuthorDate: Sun Jun 14 13:46:38 2026 +0800

    fix(bthread): refactor sharded priority queue with per-ED shard (#3270)
    
    * fix(bthread): refactor sharded priority queue with per-ED shard
    
    Each EventDispatcher gets its own WorkStealingQueue, making concurrent
    push from multiple EDs naturally SPMC-safe without spinlocks.
    
    fix: comments of redundant namespace
    
    fix: ut failure and -1 issue
    
    * fix: comments about priority pq renaming
    
    ---------
    
    Co-authored-by: yannan.wyn <[email protected]>
---
 src/brpc/event_dispatcher.cpp            |   6 +-
 src/brpc/event_dispatcher.h              |   4 +
 src/brpc/event_dispatcher_epoll.cpp      |   8 +-
 src/bthread/task_control.cpp             |  44 ++++--
 src/bthread/task_control.h               |  18 ++-
 src/bthread/task_group.cpp               |   9 +-
 src/bthread/task_meta.h                  |   2 +
 test/bthread_priority_queue_unittest.cpp | 228 +++++++++++++++++++++++++++++++
 8 files changed, 304 insertions(+), 15 deletions(-)

diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index d4316bef..15152fc8 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -23,13 +23,14 @@
 #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
 #include "bvar/latency_recorder.h"                    // bvar::LatencyRecorder
 #include "bthread/bthread.h"                          // 
bthread_start_background
+#include "bthread/task_group.h"                        // 
TaskGroup::address_meta
 #include "brpc/event_dispatcher.h"
 
 DECLARE_int32(task_group_ntags);
 
-namespace brpc {
+DECLARE_int32(event_dispatcher_num);
 
-DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
+namespace brpc {
 DEFINE_bool(event_dispatcher_edisp_unsched, false,
             "Disable event dispatcher schedule");
 
@@ -66,6 +67,7 @@ void InitializeGlobalDispatchers() {
             bthread_attr_t attr =
                 FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : 
BTHREAD_ATTR_NORMAL;
             attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
+            g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j);
             CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + 
j].Start(&attr));
         }
     }
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 3fdc9f17..d95243ac 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -114,6 +114,8 @@ public:
     // Stop bthread of this dispatcher.
     void Stop();
 
+    void set_priority_index(int idx) { _priority_index = idx; }
+
     // Suspend calling thread until bthread of this dispatcher stops.
     void Join();
 
@@ -188,6 +190,8 @@ private:
 
     // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
     int _wakeup_fds[2];
+
+    int _priority_index{-1};
 };
 
 EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
diff --git a/src/brpc/event_dispatcher_epoll.cpp 
b/src/brpc/event_dispatcher_epoll.cpp
index 5a6c23b0..31f60aac 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) {
 }
 
 void* EventDispatcher::RunThis(void* arg) {
-    ((EventDispatcher*)arg)->Run();
+    EventDispatcher* ed = (EventDispatcher*)arg;
+    if (ed->_priority_index >= 0) {
+        bthread::TaskMeta* meta =
+            bthread::TaskGroup::address_meta(bthread_self());
+        meta->priority_index = ed->_priority_index;
+    }
+    ed->Run();
     return NULL;
 }
 
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index ba067e39..347dbd24 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "",
               "Set of CPUs to which cores are bound. "
               "for example, 0-3,5,7; default: disable");
 
+DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
+
 namespace bthread {
 
 DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
@@ -205,11 +207,31 @@ TaskControl::TaskControl()
     , _status(print_rq_sizes_in_the_tc, this)
     , _nbthreads("bthread_count")
     , _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
-    , _priority_queues(FLAGS_task_group_ntags)
+    , _ed_priority_queue_num_of_each_tag(FLAGS_event_dispatcher_num)
+    , _ed_priority_queues(
+        FLAGS_task_group_ntags * FLAGS_event_dispatcher_num)
     , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
     , _tagged_pl(FLAGS_task_group_ntags)
 {}
 
+int TaskControl::init_ed_priority_queues() {
+    if (!_enable_priority_queue) {
+        return 0;
+    }
+    for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+        for (int j = 0;
+             j < _ed_priority_queue_num_of_each_tag; ++j) {
+            if (ed_priority_queue(i, j).init(
+                    BTHREAD_MAX_CONCURRENCY) != 0) {
+                LOG(ERROR) << "Fail to init priority queue for tag=" << i
+                           << " ed=" << j;
+                return -1;
+            }
+        }
+    }
+    return 0;
+}
+
 int TaskControl::init(int concurrency) {
     if (_concurrency != 0) {
         LOG(ERROR) << "Already initialized";
@@ -238,10 +260,10 @@ int TaskControl::init(int concurrency) {
         _tagged_worker_usage_second.push_back(new 
bvar::PerSecond<bvar::PassiveStatus<double>>(
             "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 
1));
         _tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", 
tag_str));
-        if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
-            LOG(ERROR) << "Fail to init _priority_q";
-            return -1;
-        }
+    }
+
+    if (init_ed_priority_queues() != 0) {
+        return -1;
     }
 
     // Make sure TimerThread is ready.
@@ -445,7 +467,7 @@ TaskControl::~TaskControl() {
     _switch_per_second.hide();
     _signal_per_second.hide();
     _status.hide();
-    
+
     stop_and_join();
 }
 
@@ -528,8 +550,13 @@ int TaskControl::_destroy_group(TaskGroup* g) {
 bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
     auto tag = tls_task_group->tag();
 
-    if (_priority_queues[tag].steal(tid)) {
-        return true;
+    if (_enable_priority_queue) {
+        for (int i = 0;
+             i < _ed_priority_queue_num_of_each_tag; ++i) {
+            if (ed_priority_queue(tag, i).steal(tid)) {
+                return true;
+            }
+        }
     }
 
     // 1: Acquiring fence is paired with releasing fence in _add_group to
@@ -689,4 +716,5 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() {
     return living_bthread_ids;
 }
 
+
 }  // namespace bthread
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 4480daa6..93336199 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -101,11 +101,13 @@ public:
     std::string stack_trace(bthread_t tid);
 #endif // BRPC_BTHREAD_TRACER
 
-    void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
-        _priority_queues[tag].push(tid);
+    void push_ed_priority_queue(
+            bthread_tag_t tag, int priority_index, bthread_t tid) {
+        ed_priority_queue(tag, priority_index).push(tid);
     }
 
     std::vector<bthread_t> get_living_bthreads();
+
 private:
     typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
     typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
@@ -123,6 +125,15 @@ private:
     // Tag parking slot
     TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }
 
+    // Priority queue for a specific ED within a tag
+    WorkStealingQueue<bthread_t>& ed_priority_queue(
+            bthread_tag_t tag, int index) {
+        return _ed_priority_queues[
+            tag * _ed_priority_queue_num_of_each_tag + index];
+    }
+
+    int init_ed_priority_queues();
+
     static void delete_task_group(void* arg);
 
     static void* worker_thread(void* task_control);
@@ -164,7 +175,8 @@ private:
     std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
 
     bool _enable_priority_queue;
-    std::vector<WorkStealingQueue<bthread_t>> _priority_queues;
+    int _ed_priority_queue_num_of_each_tag;
+    std::vector<WorkStealingQueue<bthread_t>> _ed_priority_queues;
 
     size_t _pl_num_of_each_tag;
     std::vector<TaggedParkingLot> _tagged_pl;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 4706b7f7..c0804c9a 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -526,6 +526,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
     m->cpuwide_start_ns = start_ns;
     m->stat = EMPTY_STAT;
     m->tid = make_tid(*m->version_butex, slot);
+    m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1;
     *th = m->tid;
     if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
         LOG(INFO) << "Started bthread " << m->tid;
@@ -595,6 +596,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
     m->cpuwide_start_ns = start_ns;
     m->stat = EMPTY_STAT;
     m->tid = make_tid(*m->version_butex, slot);
+    m->priority_index = _cur_meta->priority_index;
     *th = m->tid;
     if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
         LOG(INFO) << "Started bthread " << m->tid;
@@ -674,6 +676,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
     TaskGroup* g = *pg;
     bthread_t next_tid = 0;
     // Find next task to run, if none, switch to idle thread of the group.
+
 #ifndef BTHREAD_FAIR_WSQ
     // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
     // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
@@ -942,7 +945,11 @@ void TaskGroup::priority_to_run(void* args_in) {
     tls_task_group->_control->_task_tracer.set_status(
         TASK_STATUS_READY, args->meta);
 #endif // BRPC_BTHREAD_TRACER
-    return tls_task_group->control()->push_priority_queue(args->tag, 
args->meta->tid);
+    if (args->meta->priority_index < 0) {
+        return tls_task_group->push_rq(args->meta->tid);
+    }
+    return tls_task_group->control()->push_ed_priority_queue(
+        args->tag, args->meta->priority_index, args->meta->tid);
 }
 
 struct SleepArgs {
diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h
index a2490b45..074af430 100644
--- a/src/bthread/task_meta.h
+++ b/src/bthread/task_meta.h
@@ -91,6 +91,8 @@ struct TaskMeta {
     // simplified if they can get tid from TaskMeta.
     bthread_t tid{INVALID_BTHREAD};
 
+    int priority_index{-1};
+
     // User function and argument
     void* (*fn)(void*){NULL};
     void* arg{NULL};
diff --git a/test/bthread_priority_queue_unittest.cpp 
b/test/bthread_priority_queue_unittest.cpp
new file mode 100644
index 00000000..d6c9e43a
--- /dev/null
+++ b/test/bthread_priority_queue_unittest.cpp
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <atomic>
+#include <vector>
+#include <set>
+#include <mutex>
+#include <sched.h>
+#include "bthread/bthread.h"
+#include "bthread/task_group.h"
+
+namespace {
+
+// Counter incremented by priority bthreads to verify execution
+std::atomic<int> g_priority_count(0);
+// Mutex + set for collecting executed tids to verify no loss
+std::mutex g_tid_mutex;
+std::set<int> g_executed_ids;
+
+void reset_globals() {
+    g_priority_count.store(0);
+    std::lock_guard<std::mutex> lk(g_tid_mutex);
+    g_executed_ids.clear();
+}
+
+struct TaskArg {
+    int id;
+};
+
+void* priority_task_fn(void* arg) {
+    TaskArg* ta = static_cast<TaskArg*>(arg);
+    g_priority_count.fetch_add(1, std::memory_order_relaxed);
+    {
+        std::lock_guard<std::mutex> lk(g_tid_mutex);
+        g_executed_ids.insert(ta->id);
+    }
+    delete ta;
+    return NULL;
+}
+
+void* normal_task_fn(void* /*arg*/) {
+    // Just a normal task that does nothing, used as a filler
+    bthread_usleep(1000);
+    return NULL;
+}
+
+class PriorityQueueTest : public ::testing::Test {
+protected:
+    static void SetUpTestSuite() {
+        google::SetCommandLineOption("enable_bthread_priority_queue", "true");
+        google::SetCommandLineOption("event_dispatcher_num", "4");
+    }
+    void SetUp() override {
+        reset_globals();
+    }
+};
+
+// Test 1: End-to-end priority task submission and execution.
+// Multiple producers submit priority tasks, verify all tasks are executed.
+TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) {
+    const int N = 200;
+
+    bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+    attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+    std::vector<bthread_t> tids(N);
+    for (int i = 0; i < N; ++i) {
+        TaskArg* arg = new TaskArg{i};
+        ASSERT_EQ(0, bthread_start_background(&tids[i], &attr,
+                                               priority_task_fn, arg));
+    }
+
+    for (int i = 0; i < N; ++i) {
+        bthread_join(tids[i], NULL);
+    }
+
+    ASSERT_EQ(N, g_priority_count.load());
+    std::lock_guard<std::mutex> lk(g_tid_mutex);
+    ASSERT_EQ((size_t)N, g_executed_ids.size());
+    for (int i = 0; i < N; ++i) {
+        ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i;
+    }
+}
+
+// Test 2: Mix of priority and normal tasks, all complete correctly.
+TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) {
+    const int N_PRIORITY = 100;
+    const int N_NORMAL = 100;
+
+    bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL;
+    priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+    std::vector<bthread_t> tids;
+    tids.reserve(N_PRIORITY + N_NORMAL);
+
+    for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) {
+        bthread_t tid;
+        if (i % 2 == 0 && (i / 2) < N_PRIORITY) {
+            TaskArg* arg = new TaskArg{i / 2};
+            ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr,
+                                                   priority_task_fn, arg));
+        } else {
+            ASSERT_EQ(0, bthread_start_background(&tid, NULL,
+                                                   normal_task_fn, NULL));
+        }
+        tids.push_back(tid);
+    }
+
+    for (auto tid : tids) {
+        bthread_join(tid, NULL);
+    }
+
+    ASSERT_EQ(N_PRIORITY, g_priority_count.load());
+}
+
+// Test 3: start_foreground (bthread_start_urgent) with GLOBAL_PRIORITY.
+// Simulates ED calling StartInputEvent: ED bthread calls start_urgent,
+// gets preempted into PQ via priority_to_run, child runs and ends,
+// ending_sched steals ED from PQ to resume.
+TEST_F(PriorityQueueTest, start_foreground_priority_to_run) {
+    const int N = 200;
+
+    struct EDSimArg {
+        int n_tasks;
+    };
+    EDSimArg ed_arg{N};
+
+    auto ed_fn = [](void* arg) -> void* {
+        EDSimArg* ea = static_cast<EDSimArg*>(arg);
+        bthread::TaskMeta* meta =
+            bthread::TaskGroup::address_meta(bthread_self());
+        meta->priority_index = 0;
+
+        for (int i = 0; i < ea->n_tasks; ++i) {
+            TaskArg* ta = new TaskArg{i};
+            bthread_t child;
+            bthread_start_urgent(&child, NULL, priority_task_fn, ta);
+        }
+        return NULL;
+    };
+
+    bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL;
+    priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+    bthread_t ed_tid;
+    ASSERT_EQ(0, bthread_start_background(&ed_tid, &priority_attr,
+                                           ed_fn, &ed_arg));
+    bthread_join(ed_tid, NULL);
+
+    ASSERT_EQ(N, g_priority_count.load());
+    std::lock_guard<std::mutex> lk(g_tid_mutex);
+    ASSERT_EQ((size_t)N, g_executed_ids.size());
+}
+
+// Test 4: Multiple ED-like bthreads concurrently calling start_urgent.
+// Verifies PQ correctness under concurrent preemption from multiple EDs.
+TEST_F(PriorityQueueTest, multiple_eds_concurrent_preempt) {
+    const int NUM_EDS = 4;
+    const int TASKS_PER_ED = 50;
+    const int TOTAL = NUM_EDS * TASKS_PER_ED;
+    std::atomic<int> resume_count(0);
+
+    struct EDArg {
+        int ed_index;
+        int n_children;
+        std::atomic<int>* resume_count;
+    };
+
+    auto ed_fn = [](void* arg) -> void* {
+        EDArg* ea = static_cast<EDArg*>(arg);
+        bthread::TaskMeta* meta =
+            bthread::TaskGroup::address_meta(bthread_self());
+        meta->priority_index = ea->ed_index;
+
+        std::vector<bthread_t> children;
+        children.reserve(ea->n_children);
+        for (int i = 0; i < ea->n_children; ++i) {
+            int id = ea->ed_index * ea->n_children + i;
+            TaskArg* ta = new TaskArg{id};
+            bthread_t child;
+            bthread_start_urgent(&child, NULL, priority_task_fn, ta);
+            children.push_back(child);
+            ea->resume_count->fetch_add(1, std::memory_order_relaxed);
+        }
+        for (auto c : children) {
+            bthread_join(c, NULL);
+        }
+        return NULL;
+    };
+
+    bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL;
+    priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY;
+
+    std::vector<EDArg> ed_args(NUM_EDS);
+    std::vector<bthread_t> ed_tids(NUM_EDS);
+    for (int i = 0; i < NUM_EDS; ++i) {
+        ed_args[i] = {i, TASKS_PER_ED, &resume_count};
+        ASSERT_EQ(0, bthread_start_background(&ed_tids[i], &priority_attr,
+                                               ed_fn, &ed_args[i]));
+    }
+
+    for (int i = 0; i < NUM_EDS; ++i) {
+        bthread_join(ed_tids[i], NULL);
+    }
+
+    ASSERT_EQ(TOTAL, g_priority_count.load());
+    ASSERT_EQ(TOTAL, resume_count.load());
+    std::lock_guard<std::mutex> lk(g_tid_mutex);
+    ASSERT_EQ((size_t)TOTAL, g_executed_ids.size());
+}
+
+} // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to