This is an automated email from the ASF dual-hosted git repository.

xiaofeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new fe63d790 Add pthread CPU affinity support (#3122)
fe63d790 is described below

commit fe63d790afa3d8f1a2721b8a5718abe5dcfe8c6f
Author: Mao <[email protected]>
AuthorDate: Sat Nov 1 01:09:16 2025 +0800

    Add pthread CPU affinity support (#3122)
    
    * Add pthread CPU affinity support
    
    * combine thread_affinity and cpu_set into one flag
    
    * review & Support MACOS thread affinity
    
    ---------
    
    Co-authored-by: m30070657 <[email protected]>
---
 src/bthread/task_control.cpp | 87 +++++++++++++++++++++++++++++++++++++++++++-
 src/bthread/task_control.h   |  5 +++
 2 files changed, 90 insertions(+), 2 deletions(-)

diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index e43ce1ec..99e4af50 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -20,6 +20,8 @@
 // Date: Tue Jul 10 17:40:58 CST 2012
 
 #include <pthread.h>
+#include <set>
+#include <regex>
 #include <sys/syscall.h>                   // SYS_gettid
 #include "butil/scoped_lock.h"             // BAIDU_SCOPED_LOCK
 #include "butil/errno.h"                   // berror
@@ -34,6 +36,9 @@
 #include "bthread/timer_thread.h"         // global_timer_thread
 #include <gflags/gflags.h>
 #include "bthread/log.h"
+#if defined(OS_MACOSX)
+#include <mach/mach.h>
+#endif
 
 DEFINE_int32(task_group_delete_delay, 1,
              "delay deletion of TaskGroup for so many seconds");
@@ -42,6 +47,9 @@ DEFINE_int32(task_group_runqueue_capacity, 4096,
 DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
 DEFINE_bool(task_group_set_worker_name, true,
             "Whether to set the name of the worker thread");
+DEFINE_string(cpu_set, "",
+              "Set of CPUs to which cores are bound. "
+              "for example, 0-3,5,7; default: disable");
 
 namespace bthread {
 
@@ -99,10 +107,14 @@ void* TaskControl::worker_thread(void* arg) {
 
     g->_tid = pthread_self();
 
+    int worker_id = c->_next_worker_id.fetch_add(
+                        1, butil::memory_order_relaxed);
+    if (!c->_cpus.empty()) {
+        bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % 
c->_cpus.size()]);
+    }
     if (FLAGS_task_group_set_worker_name) {
         std::string worker_thread_name = butil::string_printf(
-            "brpc_wkr:%d-%d", g->tag(),
-            c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed));
+            "brpc_wkr:%d-%d", g->tag(), worker_id);
         butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
     }
     BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
@@ -209,6 +221,13 @@ int TaskControl::init(int concurrency) {
     }
     _concurrency = concurrency;
 
+    if (!FLAGS_cpu_set.empty()) {
+        if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
+            LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
+            return -1;
+        }
+    }
+
     // task group group by tags
     for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
         _tagged_ngroup[i].store(0, std::memory_order_relaxed);
@@ -309,6 +328,70 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t 
tag) {
     return NULL;
 }
 
+int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
+    static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
+    std::smatch match;
+    std::set<unsigned> cpuset;
+    if (value.empty()) {
+        return -1;
+    }
+    if (std::regex_match(value, match, r)) {
+        for (butil::StringSplitter split(value.data(), ','); split; ++split) {
+            butil::StringPiece cpu_ids(split.field(), split.length());
+            cpu_ids.trim_spaces();
+            butil::StringPiece begin = cpu_ids;
+            butil::StringPiece end = cpu_ids;
+            auto dash = cpu_ids.find('-');
+            if (dash != cpu_ids.npos) {
+                begin = cpu_ids.substr(0, dash);
+                end = cpu_ids.substr(dash + 1);
+            }
+            unsigned first = UINT_MAX;
+            unsigned last = 0;
+            int ret;
+            ret = butil::StringSplitter(begin, '\t').to_uint(&first);
+            ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
+            if (ret != 0 || first > last) {
+                return -1;
+            }
+            for (auto i = first; i <= last; ++i) {
+                cpuset.insert(i);
+            }
+        }
+        cpus.assign(cpuset.begin(), cpuset.end());
+        return 0;
+    }
+    return -1;
+}
+
+void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
+#if defined(OS_LINUX)
+        cpu_set_t cs;
+        CPU_ZERO(&cs);
+        CPU_SET(cpu_id, &cs);
+        auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
+        if (r != 0) {
+            LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
+        }
+        (void)r;
+#elif defined(OS_MACOSX)
+        thread_port_t mach_thread = pthread_mach_thread_np(pthread);
+        if (mach_thread != MACH_PORT_NULL) {
+            LOG(WARNING) << "mach_thread is null"
+                         << "Failed to bind thread to cpu: " << cpu_id;
+            return;
+        }
+        thread_affinity_policy_data_t policy;
+        policy.affinity_tag = cpu_id;
+        if (thread_policy_set(mach_thread,
+                THREAD_AFFINITY_POLICY,
+                (thread_policy_t)&policy,
+                THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
+            LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
+        }
+#endif
+}
+
 #ifdef BRPC_BTHREAD_TRACER
 void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
     _task_tracer.Trace(os, tid);
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 439c96db..4480daa6 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -91,6 +91,10 @@ public:
     // If this method is called after init(), it never returns NULL.
     TaskGroup* choose_one_group(bthread_tag_t tag);
 
+    static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
+
+    static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);
+
 #ifdef BRPC_BTHREAD_TRACER
     // A stacktrace of bthread can be helpful in debugging.
     void stack_trace(std::ostream& os, bthread_t tid);
@@ -139,6 +143,7 @@ private:
     bool _stop;
     butil::atomic<int> _concurrency;
     std::vector<pthread_t> _workers;
+    std::vector<unsigned> _cpus;
     butil::atomic<int> _next_worker_id;
 
     bvar::Adder<int64_t> _nworkers;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to