This is an automated email from the ASF dual-hosted git repository.
xiaofeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new fe63d790 Add pthread CPU affinity support (#3122)
fe63d790 is described below
commit fe63d790afa3d8f1a2721b8a5718abe5dcfe8c6f
Author: Mao <[email protected]>
AuthorDate: Sat Nov 1 01:09:16 2025 +0800
Add pthread CPU affinity support (#3122)
* Add pthread CPU affinity support
* combine thread_affinity and cpu_set into one flag
* review & Support MACOS thread affinity
---------
Co-authored-by: m30070657 <[email protected]>
---
src/bthread/task_control.cpp | 87 +++++++++++++++++++++++++++++++++++++++++++-
src/bthread/task_control.h | 5 +++
2 files changed, 90 insertions(+), 2 deletions(-)
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index e43ce1ec..99e4af50 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -20,6 +20,8 @@
// Date: Tue Jul 10 17:40:58 CST 2012
#include <pthread.h>
+#include <set>
+#include <regex>
#include <sys/syscall.h> // SYS_gettid
#include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
#include "butil/errno.h" // berror
@@ -34,6 +36,9 @@
#include "bthread/timer_thread.h" // global_timer_thread
#include <gflags/gflags.h>
#include "bthread/log.h"
+#if defined(OS_MACOSX)
+#include <mach/mach.h>
+#endif
DEFINE_int32(task_group_delete_delay, 1,
"delay deletion of TaskGroup for so many seconds");
@@ -42,6 +47,9 @@ DEFINE_int32(task_group_runqueue_capacity, 4096,
DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
DEFINE_bool(task_group_set_worker_name, true,
"Whether to set the name of the worker thread");
+DEFINE_string(cpu_set, "",
+ "Set of CPUs to which cores are bound. "
+ "for example, 0-3,5,7; default: disable");
namespace bthread {
@@ -99,10 +107,14 @@ void* TaskControl::worker_thread(void* arg) {
g->_tid = pthread_self();
+ int worker_id = c->_next_worker_id.fetch_add(
+ 1, butil::memory_order_relaxed);
+ if (!c->_cpus.empty()) {
+ bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id %
c->_cpus.size()]);
+ }
if (FLAGS_task_group_set_worker_name) {
std::string worker_thread_name = butil::string_printf(
- "brpc_wkr:%d-%d", g->tag(),
- c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed));
+ "brpc_wkr:%d-%d", g->tag(), worker_id);
butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
}
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
@@ -209,6 +221,13 @@ int TaskControl::init(int concurrency) {
}
_concurrency = concurrency;
+ if (!FLAGS_cpu_set.empty()) {
+ if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
+ LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
+ return -1;
+ }
+ }
+
// task group group by tags
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
_tagged_ngroup[i].store(0, std::memory_order_relaxed);
@@ -309,6 +328,70 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t
tag) {
return NULL;
}
+int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
+ static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
+ std::smatch match;
+ std::set<unsigned> cpuset;
+ if (value.empty()) {
+ return -1;
+ }
+ if (std::regex_match(value, match, r)) {
+ for (butil::StringSplitter split(value.data(), ','); split; ++split) {
+ butil::StringPiece cpu_ids(split.field(), split.length());
+ cpu_ids.trim_spaces();
+ butil::StringPiece begin = cpu_ids;
+ butil::StringPiece end = cpu_ids;
+ auto dash = cpu_ids.find('-');
+ if (dash != cpu_ids.npos) {
+ begin = cpu_ids.substr(0, dash);
+ end = cpu_ids.substr(dash + 1);
+ }
+ unsigned first = UINT_MAX;
+ unsigned last = 0;
+ int ret;
+ ret = butil::StringSplitter(begin, '\t').to_uint(&first);
+ ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
+ if (ret != 0 || first > last) {
+ return -1;
+ }
+ for (auto i = first; i <= last; ++i) {
+ cpuset.insert(i);
+ }
+ }
+ cpus.assign(cpuset.begin(), cpuset.end());
+ return 0;
+ }
+ return -1;
+}
+
+void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
+#if defined(OS_LINUX)
+ cpu_set_t cs;
+ CPU_ZERO(&cs);
+ CPU_SET(cpu_id, &cs);
+ auto r = pthread_setaffinity_np(pthread, sizeof(cs), &cs);
+ if (r != 0) {
+ LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
+ }
+ (void)r;
+#elif defined(OS_MACOSX)
+ thread_port_t mach_thread = pthread_mach_thread_np(pthread);
+ if (mach_thread != MACH_PORT_NULL) {
+ LOG(WARNING) << "mach_thread is null"
+ << "Failed to bind thread to cpu: " << cpu_id;
+ return;
+ }
+ thread_affinity_policy_data_t policy;
+ policy.affinity_tag = cpu_id;
+ if (thread_policy_set(mach_thread,
+ THREAD_AFFINITY_POLICY,
+ (thread_policy_t)&policy,
+ THREAD_AFFINITY_POLICY_COUNT) != KERN_SUCCESS) {
+ LOG(WARNING) << "Failed to bind thread to cpu: " << cpu_id;
+ }
+#endif
+}
+
#ifdef BRPC_BTHREAD_TRACER
void TaskControl::stack_trace(std::ostream& os, bthread_t tid) {
_task_tracer.Trace(os, tid);
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 439c96db..4480daa6 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -91,6 +91,10 @@ public:
// If this method is called after init(), it never returns NULL.
TaskGroup* choose_one_group(bthread_tag_t tag);
+ static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
+
+ static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);
+
#ifdef BRPC_BTHREAD_TRACER
// A stacktrace of bthread can be helpful in debugging.
void stack_trace(std::ostream& os, bthread_t tid);
@@ -139,6 +143,7 @@ private:
bool _stop;
butil::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
+ std::vector<unsigned> _cpus;
butil::atomic<int> _next_worker_id;
bvar::Adder<int64_t> _nworkers;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]