This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 3775b419 Support FastPthreadMutex contention profiler && expose
FastPthreadMutex to user (#2589)
3775b419 is described below
commit 3775b41918a0e8c27ea5ba9ba414f0eb2b84e283
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jun 3 16:48:02 2024 +0800
Support FastPthreadMutex contention profiler && expose FastPthreadMutex to
user (#2589)
---
src/bthread/butex.cpp | 6 ++--
src/bthread/id.cpp | 2 +-
src/bthread/mutex.cpp | 80 +++++++++++++++++++++++++++++++++--------
src/bthread/mutex.h | 15 +++++++-
src/bthread/timer_thread.cpp | 2 +-
src/bthread/timer_thread.h | 2 +-
test/bthread_mutex_unittest.cpp | 40 +++++++++++++++++++--
7 files changed, 122 insertions(+), 25 deletions(-)
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 1dbd8930..2b7c78b8 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -121,7 +121,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Butex {
butil::atomic<int> value;
ButexWaiterList waiters;
- internal::FastPthreadMutex waiter_lock;
+ FastPthreadMutex waiter_lock;
};
BAIDU_CASSERT(offsetof(Butex, value) == 0, offsetof_value_must_0);
@@ -460,8 +460,8 @@ int butex_requeue(void* arg, void* arg2) {
ButexWaiter* front = NULL;
{
- std::unique_lock<internal::FastPthreadMutex> lck1(b->waiter_lock,
std::defer_lock);
- std::unique_lock<internal::FastPthreadMutex> lck2(m->waiter_lock,
std::defer_lock);
+ std::unique_lock<FastPthreadMutex> lck1(b->waiter_lock,
std::defer_lock);
+ std::unique_lock<FastPthreadMutex> lck2(m->waiter_lock,
std::defer_lock);
butil::double_lock(lck1, lck2);
if (b->waiters.empty()) {
return 0;
diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp
index ba77580a..7aabed68 100644
--- a/src/bthread/id.cpp
+++ b/src/bthread/id.cpp
@@ -114,7 +114,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Id {
// contended_ver: locked and contended
uint32_t first_ver;
uint32_t locked_ver;
- internal::FastPthreadMutex mutex;
+ FastPthreadMutex mutex;
void* data;
int (*on_error)(bthread_id_t, void*, int);
int (*on_error2)(bthread_id_t, void*, int, const std::string&);
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index d22c8753..357452ee 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -448,7 +448,8 @@ int first_sys_pthread_mutex_unlock(pthread_mutex_t* mutex) {
return sys_pthread_mutex_unlock(mutex);
}
-inline uint64_t hash_mutex_ptr(const pthread_mutex_t* m) {
+template <typename Mutex>
+inline uint64_t hash_mutex_ptr(const Mutex* m) {
return butil::fmix64((uint64_t)m);
}
@@ -468,7 +469,7 @@ static __thread bool tls_inside_lock = false;
#ifndef DONT_SPEEDUP_PTHREAD_CONTENTION_PROFILER_WITH_TLS
const int TLS_MAX_COUNT = 3;
struct MutexAndContentionSite {
- pthread_mutex_t* mutex;
+ void* mutex;
bthread_contention_site_t csite;
};
struct TLSPthreadContentionSites {
@@ -482,8 +483,9 @@ static __thread TLSPthreadContentionSites tls_csites =
{0,0,{}};
// Guaranteed in linux/win.
const int PTR_BITS = 48;
+template <typename Mutex>
inline bthread_contention_site_t*
-add_pthread_contention_site(pthread_mutex_t* mutex) {
+add_pthread_contention_site(const Mutex* mutex) {
MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE
- 1)];
butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
uint64_t expected = m.load(butil::memory_order_relaxed);
@@ -500,8 +502,9 @@ add_pthread_contention_site(pthread_mutex_t* mutex) {
return NULL;
}
-inline bool remove_pthread_contention_site(
- pthread_mutex_t* mutex, bthread_contention_site_t* saved_csite) {
+template <typename Mutex>
+inline bool remove_pthread_contention_site(const Mutex* mutex,
+ bthread_contention_site_t*
saved_csite) {
MutexMapEntry& entry = g_mutex_map[hash_mutex_ptr(mutex) & (MUTEX_MAP_SIZE
- 1)];
butil::static_atomic<uint64_t>& m = entry.versioned_mutex;
if ((m.load(butil::memory_order_relaxed) & ((((uint64_t)1) << PTR_BITS) -
1))
@@ -538,16 +541,44 @@ void submit_contention(const bthread_contention_site_t&
csite, int64_t now_ns) {
tls_inside_lock = false;
}
-BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
+namespace internal {
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(pthread_mutex_t* mutex) {
+ return sys_pthread_mutex_lock(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(pthread_mutex_t* mutex) {
+ return ::pthread_mutex_trylock(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(pthread_mutex_t* mutex) {
+ return sys_pthread_mutex_unlock(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_lock_internal(FastPthreadMutex* mutex) {
+ mutex->lock();
+ return 0;
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_trylock_internal(FastPthreadMutex* mutex)
{
+ return mutex->try_lock() ? 0 : EBUSY;
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_internal(FastPthreadMutex* mutex) {
+ mutex->unlock();
+ return 0;
+}
+
+template <typename Mutex>
+BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex* mutex) {
// Don't change behavior of lock when profiler is off.
if (!g_cp ||
// collecting code including backtrace() and submit() may call
// pthread_mutex_lock and cause deadlock. Don't sample.
tls_inside_lock) {
- return sys_pthread_mutex_lock(mutex);
+ return pthread_mutex_lock_internal(mutex);
}
// Don't slow down non-contended locks.
- int rc = pthread_mutex_trylock(mutex);
+ int rc = pthread_mutex_trylock_internal(mutex);
if (rc != EBUSY) {
return rc;
}
@@ -567,16 +598,16 @@ BUTIL_FORCE_INLINE int
pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
csite = &entry.csite;
if (!sampling_range) {
make_contention_site_invalid(&entry.csite);
- return sys_pthread_mutex_lock(mutex);
+ return pthread_mutex_lock_internal(mutex);
}
}
#endif
if (!sampling_range) { // don't sample
- return sys_pthread_mutex_lock(mutex);
+ return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
const int64_t start_ns = butil::cpuwide_time_ns();
- rc = sys_pthread_mutex_lock(mutex);
+ rc = pthread_mutex_lock_internal(mutex);
if (!rc) { // Inside lock
if (!csite) {
csite = add_pthread_contention_site(mutex);
@@ -590,13 +621,14 @@ BUTIL_FORCE_INLINE int
pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
return rc;
}
-BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
+template <typename Mutex>
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(Mutex* mutex) {
// Don't change behavior of unlock when profiler is off.
if (!g_cp || tls_inside_lock) {
// This branch brings an issue that an entry created by
- // add_pthread_contention_site may not be cleared. Thus we add a
+ // add_pthread_contention_site may not be cleared. Thus we add a
// 16-bit rolling version in the entry to find out such entry.
- return sys_pthread_mutex_unlock(mutex);
+ return pthread_mutex_unlock_internal(mutex);
}
int64_t unlock_start_ns = 0;
bool miss_in_tls = true;
@@ -622,7 +654,7 @@ BUTIL_FORCE_INLINE int
pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
unlock_start_ns = butil::cpuwide_time_ns();
}
}
- const int rc = sys_pthread_mutex_unlock(mutex);
+ const int rc = pthread_mutex_unlock_internal(mutex);
// [Outside lock]
if (unlock_start_ns) {
const int64_t unlock_end_ns = butil::cpuwide_time_ns();
@@ -632,6 +664,16 @@ BUTIL_FORCE_INLINE int
pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
return rc;
}
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(pthread_mutex_t* mutex) {
+ return internal::pthread_mutex_lock_impl(mutex);
+}
+
+BUTIL_FORCE_INLINE int pthread_mutex_unlock_impl(pthread_mutex_t* mutex) {
+ return internal::pthread_mutex_unlock_impl(mutex);
+}
+
// Implement bthread_mutex_t related functions
struct MutexInternal {
butil::static_atomic<unsigned char> locked;
@@ -714,6 +756,14 @@ void FastPthreadMutex::unlock() {
} // namespace internal
#endif // BTHREAD_USE_FAST_PTHREAD_MUTEX
+void FastPthreadMutex::lock() {
+ internal::pthread_mutex_lock_impl(&_mutex);
+}
+
+void FastPthreadMutex::unlock() {
+ internal::pthread_mutex_unlock_impl(&_mutex);
+}
+
} // namespace bthread
extern "C" {
diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h
index 242a620f..ad6d2e5c 100644
--- a/src/bthread/mutex.h
+++ b/src/bthread/mutex.h
@@ -72,7 +72,7 @@ namespace internal {
class FastPthreadMutex {
public:
FastPthreadMutex() : _futex(0) {}
- ~FastPthreadMutex() {}
+ ~FastPthreadMutex() = default;
void lock();
void unlock();
bool try_lock();
@@ -86,6 +86,19 @@ typedef butil::Mutex FastPthreadMutex;
#endif
}
+class FastPthreadMutex {
+public:
+ FastPthreadMutex() = default;
+ ~FastPthreadMutex() = default;
+ DISALLOW_COPY_AND_ASSIGN(FastPthreadMutex);
+
+ void lock();
+ void unlock();
+ bool try_lock() { return _mutex.try_lock(); }
+private:
+ internal::FastPthreadMutex _mutex;
+};
+
} // namespace bthread
// Specialize std::lock_guard and std::unique_lock for bthread_mutex_t
diff --git a/src/bthread/timer_thread.cpp b/src/bthread/timer_thread.cpp
index 3b2f8a76..ee80568c 100644
--- a/src/bthread/timer_thread.cpp
+++ b/src/bthread/timer_thread.cpp
@@ -92,7 +92,7 @@ public:
Task* consume_tasks();
private:
- internal::FastPthreadMutex _mutex;
+ FastPthreadMutex _mutex;
int64_t _nearest_run_time;
Task* _task_head;
};
diff --git a/src/bthread/timer_thread.h b/src/bthread/timer_thread.h
index 139c2e98..1be061cc 100644
--- a/src/bthread/timer_thread.h
+++ b/src/bthread/timer_thread.h
@@ -95,7 +95,7 @@ private:
TimerThreadOptions _options;
Bucket* _buckets; // list of tasks to be run
- internal::FastPthreadMutex _mutex; // protect _nearest_run_time
+ FastPthreadMutex _mutex; // protect _nearest_run_time
int64_t _nearest_run_time;
// the futex for wake up timer thread. can't use _nearest_run_time because
// it's 64-bit.
diff --git a/test/bthread_mutex_unittest.cpp b/test/bthread_mutex_unittest.cpp
index 38c43eed..21bd6044 100644
--- a/test/bthread_mutex_unittest.cpp
+++ b/test/bthread_mutex_unittest.cpp
@@ -229,8 +229,9 @@ TEST(MutexTest, performance) {
PerfTest(&bth_mutex, (bthread_t*)NULL, thread_num,
bthread_start_background, bthread_join);
}
+template <typename Mutex>
void* loop_until_stopped(void* arg) {
- bthread::Mutex *m = (bthread::Mutex*)arg;
+ auto m = (Mutex*)arg;
while (!g_stopped) {
BAIDU_SCOPED_LOCK(*m);
bthread_usleep(20);
@@ -251,11 +252,11 @@ TEST(MutexTest, mix_thread_types) {
// true, thus loop_until_stopped spins forever)
bthread_setconcurrency(M);
for (int i = 0; i < N; ++i) {
- ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped,
&m));
+ ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
loop_until_stopped<bthread::Mutex>, &m));
}
for (int i = 0; i < M; ++i) {
const bthread_attr_t *attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
- ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr,
loop_until_stopped, &m));
+ ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr,
loop_until_stopped<bthread::Mutex>, &m));
}
bthread_usleep(1000L * 1000);
g_stopped = true;
@@ -266,4 +267,37 @@ TEST(MutexTest, mix_thread_types) {
pthread_join(pthreads[i], NULL);
}
}
+
+TEST(MutexTest, fast_pthread_mutex) {
+ bthread::FastPthreadMutex mutex;
+ ASSERT_TRUE(mutex.try_lock());
+ mutex.unlock();
+ mutex.lock();
+ mutex.unlock();
+ {
+ BAIDU_SCOPED_LOCK(mutex);
+ }
+ {
+ std::unique_lock<bthread::FastPthreadMutex> lck1;
+ std::unique_lock<bthread::FastPthreadMutex> lck2(mutex);
+ lck1.swap(lck2);
+ lck1.unlock();
+ lck1.lock();
+ }
+ ASSERT_TRUE(mutex.try_lock());
+ mutex.unlock();
+
+ const int N = 16;
+ pthread_t pthreads[N];
+ for (int i = 0; i < N; ++i) {
+ ASSERT_EQ(0, pthread_create(&pthreads[i], NULL,
+ loop_until_stopped<bthread::FastPthreadMutex>, &mutex));
+ }
+ bthread_usleep(1000L * 1000);
+ g_stopped = true;
+ for (int i = 0; i < N; ++i) {
+ pthread_join(pthreads[i], NULL);
+ }
+}
+
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]