This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new fea2952a Active Spinning and queue old bthread at the head for bthread
mutex (#2749)
fea2952a is described below
commit fea2952aaf7b2ee8ef1953294321b78f94ec683f
Author: Bright Chen <[email protected]>
AuthorDate: Fri Sep 6 10:42:35 2024 +0800
Active Spinning and queue old bthread at the head for bthread mutex (#2749)
---
src/bthread/butex.cpp | 29 +++++++++++++-----
src/bthread/butex.h | 7 ++++-
src/bthread/mutex.cpp | 63 ++++++++++++++++++++++++--------------
src/bthread/task_group.h | 5 +++
src/butil/containers/linked_list.h | 5 +++
src/butil/thread_local.h | 18 +++++++++--
6 files changed, 93 insertions(+), 34 deletions(-)
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 2b7c78b8..b603d89c 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -537,8 +537,14 @@ inline bool erase_from_butex(ButexWaiter* bw, bool wakeup,
WaiterState state) {
return erased;
}
+struct WaitForButexArgs {
+ ButexBthreadWaiter* bw;
+ bool prepend;
+};
+
static void wait_for_butex(void* arg) {
- ButexBthreadWaiter* const bw = static_cast<ButexBthreadWaiter*>(arg);
+ auto args = static_cast<WaitForButexArgs*>(arg);
+ ButexBthreadWaiter* const bw = args->bw;
Butex* const b = bw->initial_butex;
// 1: waiter with timeout should have waiter_state == WAITER_STATE_READY
// before they're queued, otherwise the waiter is already timedout
@@ -560,7 +566,11 @@ static void wait_for_butex(void* arg) {
bw->waiter_state = WAITER_STATE_UNMATCHEDVALUE;
} else if (bw->waiter_state == WAITER_STATE_READY/*1*/ &&
!bw->task_meta->interrupted) {
- b->waiters.Append(bw);
+ if (args->prepend) {
+ b->waiters.Prepend(bw);
+ } else {
+ b->waiters.Append(bw);
+ }
bw->container.store(b, butil::memory_order_relaxed);
if (bw->abstime != NULL) {
bw->sleep_id = get_global_timer_thread()->schedule(
@@ -593,7 +603,7 @@ static void wait_for_butex(void* arg) {
}
static int butex_wait_from_pthread(TaskGroup* g, Butex* b, int expected_value,
- const timespec* abstime) {
+ const timespec* abstime, bool prepend) {
TaskMeta* task = NULL;
ButexPthreadWaiter pw;
pw.tid = 0;
@@ -616,7 +626,11 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b,
int expected_value,
errno = EINTR;
rc = -1;
} else {
- b->waiters.Append(&pw);
+ if (prepend) {
+ b->waiters.Prepend(&pw);
+ } else {
+ b->waiters.Append(&pw);
+ }
pw.container.store(b, butil::memory_order_relaxed);
b->waiter_lock.unlock();
@@ -646,7 +660,7 @@ static int butex_wait_from_pthread(TaskGroup* g, Butex* b,
int expected_value,
return rc;
}
-int butex_wait(void* arg, int expected_value, const timespec* abstime) {
+int butex_wait(void* arg, int expected_value, const timespec* abstime, bool
prepend) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex,
value);
if (b->value.load(butil::memory_order_relaxed) != expected_value) {
errno = EWOULDBLOCK;
@@ -657,7 +671,7 @@ int butex_wait(void* arg, int expected_value, const
timespec* abstime) {
}
TaskGroup* g = tls_task_group;
if (NULL == g || g->is_current_pthread_task()) {
- return butex_wait_from_pthread(g, b, expected_value, abstime);
+ return butex_wait_from_pthread(g, b, expected_value, abstime, prepend);
}
ButexBthreadWaiter bbw;
// tid is 0 iff the thread is non-bthread
@@ -690,7 +704,8 @@ int butex_wait(void* arg, int expected_value, const
timespec* abstime) {
// release fence matches with acquire fence in
interrupt_and_consume_waiters
// in task_group.cpp to guarantee visibility of `interrupted'.
bbw.task_meta->current_waiter.store(&bbw, butil::memory_order_release);
- g->set_remained(wait_for_butex, &bbw);
+ WaitForButexArgs args{ &bbw, prepend};
+ g->set_remained(wait_for_butex, &args);
TaskGroup::sched(&g);
// erase_from_butex_and_wakeup (called by TimerThread) is possibly still
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index 93a1f6ec..b40ec1e0 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -67,8 +67,13 @@ int butex_requeue(void* butex1, void* butex2);
// abstime is not NULL.
// About |abstime|:
// Different from FUTEX_WAIT, butex_wait uses absolute time.
+// About |prepend|:
+// If |prepend| is true, queue the bthread at the head of the queue,
+// otherwise at the tail.
// Returns 0 on success, -1 otherwise and errno is set.
-int butex_wait(void* butex, int expected_value, const timespec* abstime);
+int butex_wait(void* butex, int expected_value,
+ const timespec* abstime,
+ bool prepend = false);
} // namespace bthread
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index f2606bb9..403f6bb8 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -39,17 +39,22 @@
#include "butil/logging.h"
#include "butil/object_pool.h"
#include "butil/debug/stack_trace.h"
+#include "butil/thread_local.h"
#include "bthread/butex.h" // butex_*
#include "bthread/mutex.h" // bthread_mutex_t
#include "bthread/sys_futex.h"
#include "bthread/log.h"
-#include "butil/debug/stack_trace.h"
+#include "bthread/processor.h"
+#include "bthread/task_group.h"
extern "C" {
extern void* BAIDU_WEAK _dl_sym(void* handle, const char* symbol, void*
caller);
}
namespace bthread {
+
+EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*, tls_task_group);
+
// Warm up backtrace before main().
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;
@@ -772,29 +777,41 @@ const MutexInternal MUTEX_LOCKED_RAW = {{1},{0},0};
BAIDU_CASSERT(sizeof(unsigned) == sizeof(MutexInternal),
sizeof_mutex_internal_must_equal_unsigned);
-inline int mutex_lock_contended(bthread_mutex_t* m) {
- butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
- while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
- if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, NULL) < 0 &&
- errno != EWOULDBLOCK && errno != EINTR/*note*/) {
- // a mutex lock should ignore interruptions in general since
- // user code is unlikely to check the return value.
- return errno;
+const int MAX_SPIN_ITER = 4;
+
+inline int mutex_lock_contended_impl(
+ bthread_mutex_t* m, const struct timespec* __restrict abstime) {
+ // When a bthread first contends for a lock, active spinning makes sense.
+ // Spin only few times and only if local `rq' is empty.
+ TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group);
+ if (BAIDU_UNLIKELY(NULL == g || g->rq_size() == 0)) {
+ for (int i = 0; i < MAX_SPIN_ITER; ++i) {
+ cpu_relax();
}
}
- return 0;
-}
-inline int mutex_timedlock_contended(
- bthread_mutex_t* m, const struct timespec* __restrict abstime) {
- butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
+ bool queue_lifo = false;
+ bool first_wait = true;
+ auto whole = (butil::atomic<unsigned>*)m->butex;
while (whole->exchange(BTHREAD_MUTEX_CONTENDED) & BTHREAD_MUTEX_LOCKED) {
- if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime) < 0 &&
+ if (bthread::butex_wait(whole, BTHREAD_MUTEX_CONTENDED, abstime,
queue_lifo) < 0 &&
errno != EWOULDBLOCK && errno != EINTR/*note*/) {
- // a mutex lock should ignore interrruptions in general since
+ // A mutex lock should ignore interruptions in general since
// user code is unlikely to check the return value.
return errno;
}
+ // Ignore EWOULDBLOCK and EINTR.
+ if (first_wait && 0 == errno) {
+ first_wait = false;
+ }
+ if (!first_wait) {
+ // Normally, bthreads are queued in FIFO order. But competing with
new
+ // arriving bthreads over the ownership of mutex, a woken up
bthread
+ // has good chances of losing. Because new arriving bthreads are
already
+ // running on CPU and there can be lots of them. In such case, for
fairness,
+ // to avoid starvation, it is queued at the head of the waiter
queue.
+ queue_lifo = true;
+ }
}
return 0;
}
@@ -880,7 +897,7 @@ int bthread_mutex_trylock(bthread_mutex_t* m) {
}
int bthread_mutex_lock_contended(bthread_mutex_t* m) {
- return bthread::mutex_lock_contended(m);
+ return bthread::mutex_lock_contended_impl(m, NULL);
}
int bthread_mutex_lock(bthread_mutex_t* m) {
@@ -890,18 +907,18 @@ int bthread_mutex_lock(bthread_mutex_t* m) {
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
- return bthread::mutex_lock_contended(m);
+ return bthread::mutex_lock_contended_impl(m, NULL);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
- return bthread::mutex_lock_contended(m);
+ return bthread::mutex_lock_contended_impl(m, NULL);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
- const int rc = bthread::mutex_lock_contended(m);
+ const int rc = bthread::mutex_lock_contended_impl(m, NULL);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
@@ -917,18 +934,18 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
}
// Don't sample when contention profiler is off.
if (!bthread::g_cp) {
- return bthread::mutex_timedlock_contended(m, abstime);
+ return bthread::mutex_lock_contended_impl(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
if (!sampling_range) { // Don't sample
- return bthread::mutex_timedlock_contended(m, abstime);
+ return bthread::mutex_lock_contended_impl(m, abstime);
}
// Start sampling.
const int64_t start_ns = butil::cpuwide_time_ns();
// NOTE: Don't modify m->csite outside lock since multiple threads are
// still contending with each other.
- const int rc = bthread::mutex_timedlock_contended(m, abstime);
+ const int rc = bthread::mutex_lock_contended_impl(m, abstime);
if (!rc) { // Inside lock
m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
m->csite.sampling_range = sampling_range;
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index b71994a0..a19bd023 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -182,6 +182,11 @@ public:
// process make go on indefinitely.
void push_rq(bthread_t tid);
+ // Returns size of local run queue.
+ size_t rq_size() const {
+ return _rq.volatile_size();
+ }
+
bthread_tag_t tag() const { return _tag; }
private:
diff --git a/src/butil/containers/linked_list.h
b/src/butil/containers/linked_list.h
index 7130c046..7874b65a 100644
--- a/src/butil/containers/linked_list.h
+++ b/src/butil/containers/linked_list.h
@@ -171,6 +171,11 @@ class LinkedList {
e->InsertBefore(&root_);
}
+ // Prepend |e| to the head of the linked list.
+ void Prepend(LinkNode<T>* e) {
+ e->InsertAfter(&root_);
+ }
+
LinkNode<T>* head() const {
return root_.next();
}
diff --git a/src/butil/thread_local.h b/src/butil/thread_local.h
index a3cb1ff0..a67327c6 100644
--- a/src/butil/thread_local.h
+++ b/src/butil/thread_local.h
@@ -32,20 +32,25 @@
#define BAIDU_VOLATILE_THREAD_LOCAL(type, var_name, default_value)
\
BAIDU_THREAD_LOCAL type var_name = default_value;
\
- static __attribute__((noinline, unused)) type get_##var_name(void) {
\
+ __attribute__((noinline, unused)) type get_##var_name(void) {
\
asm volatile("");
\
return var_name;
\
}
\
- static __attribute__((noinline, unused)) type *get_ptr_##var_name(void) {
\
+ __attribute__((noinline, unused)) type *get_ptr_##var_name(void) {
\
type *ptr = &var_name;
\
asm volatile("" : "+rm"(ptr));
\
return ptr;
\
}
\
- static __attribute__((noinline, unused)) void set_##var_name(type v) {
\
+ __attribute__((noinline, unused)) void set_##var_name(type v) {
\
asm volatile("");
\
var_name = v;
\
}
+#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name)
\
+ type get_##var_name(void);
\
+ type *get_ptr_##var_name(void);
\
+ void set_##var_name(type v)
+
#if (defined (__aarch64__) && defined (__GNUC__)) || defined(__clang__)
// GNU compiler under aarch and Clang compiler is incorrectly caching the
// address of thread_local variables across a suspend-point. The following
@@ -53,10 +58,17 @@
#define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) get_##var_name()
#define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) get_ptr_##var_name()
#define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) set_##var_name(value)
+
+#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name)
\
+ type get_##var_name(void);
\
+ type *get_ptr_##var_name(void);
\
+ void set_##var_name(type v)
#else
#define BAIDU_GET_VOLATILE_THREAD_LOCAL(var_name) var_name
#define BAIDU_GET_PTR_VOLATILE_THREAD_LOCAL(var_name) &##var_name
#define BAIDU_SET_VOLATILE_THREAD_LOCAL(var_name, value) var_name = value
+#define EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(type, var_name)
\
+ extern BAIDU_THREAD_LOCAL type var_name
#endif
namespace butil {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]