This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new f17048e1 Support semaphore and rwlock for bthread (#2752)
f17048e1 is described below
commit f17048e18011d842bcaf3f37e50dc45307573a35
Author: Bright Chen <[email protected]>
AuthorDate: Thu Sep 26 10:49:23 2024 +0800
Support semaphore and rwlock for bthread (#2752)
* Support bthread semaphore
* Support bthread rwlock
* Support contention profiler for semaphore and rwlock
---
src/bthread/bthread.h | 66 +++++-
src/bthread/butex.cpp | 8 +-
src/bthread/butex.h | 5 +
src/bthread/mutex.cpp | 87 ++++----
src/bthread/mutex.h | 14 +-
src/bthread/rwlock.cpp | 368 +++++++++++++++++++++++++++++++++
src/bthread/rwlock.h | 214 ++++++++++++++++++++
src/bthread/semaphore.cpp | 173 ++++++++++++++++
src/bthread/types.h | 14 ++
src/bvar/collector.h | 8 +-
test/BUILD.bazel | 2 +
test/bthread_rwlock_unittest.cpp | 393 +++++++++++++++++++++++++++++++++++-
test/bthread_semaphore_unittest.cpp | 208 +++++++++++++++++++
13 files changed, 1493 insertions(+), 67 deletions(-)
diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h
index 68734e05..8532b3b3 100644
--- a/src/bthread/bthread.h
+++ b/src/bthread/bthread.h
@@ -170,7 +170,7 @@ extern int bthread_usleep(uint64_t microseconds);
// NOTE: mutexattr is not used in current mutex implementation. User shall
// always pass a NULL attribute.
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
- const bthread_mutexattr_t* __restrict
mutex_attr);
+ const bthread_mutexattr_t* __restrict attr);
// Destroy `mutex'.
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
@@ -188,6 +188,13 @@ extern int bthread_mutex_timedlock(bthread_mutex_t*
__restrict mutex,
// Unlock `mutex'.
extern int bthread_mutex_unlock(bthread_mutex_t* mutex);
+extern int bthread_mutexattr_init(bthread_mutexattr_t* attr);
+
+// Disable the contention profile of the mutex.
+extern int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr);
+
+extern int bthread_mutexattr_destroy(bthread_mutexattr_t* attr);
+
// -----------------------------------------------
// Functions for handling conditional variables.
// -----------------------------------------------
@@ -241,9 +248,8 @@ extern int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock);
// Try to acquire read lock for `rwlock' or return after specfied time.
-extern int bthread_rwlock_timedrdlock(
- bthread_rwlock_t* __restrict rwlock,
- const struct timespec* __restrict abstime);
+extern int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict
abstime);
// Acquire write lock for `rwlock'.
extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
@@ -252,9 +258,8 @@ extern int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock);
extern int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock);
// Try to acquire write lock for `rwlock' or return after specfied time.
-extern int bthread_rwlock_timedwrlock(
- bthread_rwlock_t* __restrict rwlock,
- const struct timespec* __restrict abstime);
+extern int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict
abstime);
// Unlock `rwlock'.
extern int bthread_rwlock_unlock(bthread_rwlock_t* rwlock);
@@ -277,6 +282,53 @@ extern int bthread_rwlockattr_getkind_np(const
bthread_rwlockattr_t* attr,
extern int bthread_rwlockattr_setkind_np(bthread_rwlockattr_t* attr,
int pref);
+// -------------------------------------------
+// Functions for handling semaphore.
+// -------------------------------------------
+
+// Initialize the semaphore referred to by `sem'. The value of the
+// initialized semaphore shall be `value'.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_init(bthread_sem_t* sem, unsigned value);
+
+// Disable the contention profile of the semaphore referred to by `sem'.
+extern int bthread_sem_disable_csite(bthread_sem_t* sem);
+
+// Destroy the semaphore indicated by `sem'.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_destroy(bthread_sem_t* semaphore);
+
+// Lock the semaphore referenced by `sem' by performing a semaphore
+// lock operation on that semaphore. If the semaphore value is currently
+// zero, then the calling (b)thread shall not return from the call to
+// bthread_sema_wait() function until it locks the semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_wait(bthread_sem_t* sem);
+
+// Lock the semaphore referenced by `sem' as in the bthread_sem_wait()
+// function. However, if the semaphore cannot be locked without waiting
+// for another (b)thread to unlock the semaphore by performing a
+// bthread_sem_post() function, this wait shall be terminated when the
+// specified timeout expires.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec*
abstime);
+
+// Lock the semaphore referenced by `sem' only if the semaphore is
+// currently not locked; that is, if the semaphore value is currently
+// positive. Otherwise, it shall not lock the semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_trywait(bthread_sem_t* sem);
+
+// Unlock the semaphore referenced by `sem' by performing
+// a semaphore unlock operation on that semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_post(bthread_sem_t* sem);
+
+// Unlock the semaphore referenced by `sem' by performing
+// `n' semaphore unlock operation on that semaphore.
+// Return 0 on success, errno otherwise.
+extern int bthread_sem_post_n(bthread_sem_t* sem, size_t n);
+
// ----------------------------------------------------------------------
// Functions for handling barrier which is a new feature in 1003.1j-2000.
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index b603d89c..4a0f9c37 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -329,14 +329,14 @@ int butex_wake(void* arg, bool nosignal) {
return 1;
}
-int butex_wake_all(void* arg, bool nosignal) {
+int butex_wake_n(void* arg, size_t n, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex,
value);
ButexWaiterList bthread_waiters;
ButexWaiterList pthread_waiters;
{
BAIDU_SCOPED_LOCK(b->waiter_lock);
- while (!b->waiters.empty()) {
+ for (size_t i = 0; (n == 0 || i < n) && !b->waiters.empty(); ++i) {
ButexWaiter* bw = b->waiters.head()->value();
bw->RemoveFromList();
bw->container.store(NULL, butil::memory_order_relaxed);
@@ -393,6 +393,10 @@ int butex_wake_all(void* arg, bool nosignal) {
return nwakeup;
}
+int butex_wake_all(void* arg, bool nosignal) {
+ return butex_wake_n(arg, 0, nosignal);
+}
+
int butex_wake_except(void* arg, bthread_t excluded_bthread) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex,
value);
diff --git a/src/bthread/butex.h b/src/bthread/butex.h
index b40ec1e0..2786ef68 100644
--- a/src/bthread/butex.h
+++ b/src/bthread/butex.h
@@ -48,6 +48,11 @@ void butex_destroy(void* butex);
// Returns # of threads woken up.
int butex_wake(void* butex, bool nosignal = false);
+// Wake up all threads waiting on |butex| if n is zero,
+// Otherwise, wake up at most n thread waiting on |butex|.
+// Returns # of threads woken up.
+int butex_wake_n(void* butex, size_t n, bool nosignal = false);
+
// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex, bool nosignal = false);
diff --git a/src/bthread/mutex.cpp b/src/bthread/mutex.cpp
index 403f6bb8..fa2f91c6 100644
--- a/src/bthread/mutex.cpp
+++ b/src/bthread/mutex.cpp
@@ -59,7 +59,7 @@ EXTERN_BAIDU_VOLATILE_THREAD_LOCAL(TaskGroup*,
tls_task_group);
const butil::debug::StackTrace ALLOW_UNUSED dummy_bt;
// For controlling contentions collected per second.
-static bvar::CollectorSpeedLimit g_cp_sl =
BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
+bvar::CollectorSpeedLimit g_cp_sl = BVAR_COLLECTOR_SPEED_LIMIT_INITIALIZER;
const size_t MAX_CACHED_CONTENTIONS = 512;
// Skip frames which are always same: the unlock function and
submit_contention()
@@ -267,7 +267,7 @@ void ContentionProfiler::flush_to_disk(bool ending) {
// If contention profiler is on, this variable will be set with a valid
// instance. NULL otherwise.
-BAIDU_CACHELINE_ALIGNMENT static ContentionProfiler* g_cp = NULL;
+BAIDU_CACHELINE_ALIGNMENT ContentionProfiler* g_cp = NULL;
// Need this version to solve an issue that non-empty entries left by
// previous contention profilers should be detected and overwritten.
static uint64_t g_cp_version = 0;
@@ -369,13 +369,11 @@ void ContentionProfilerStop() {
LOG(ERROR) << "Contention profiler is not started!";
}
-BUTIL_FORCE_INLINE bool
-is_contention_site_valid(const bthread_contention_site_t& cs) {
- return cs.sampling_range;
+bool is_contention_site_valid(const bthread_contention_site_t& cs) {
+ return bvar::is_sampling_range_valid(cs.sampling_range);
}
-BUTIL_FORCE_INLINE void
-make_contention_site_invalid(bthread_contention_site_t* cs) {
+void make_contention_site_invalid(bthread_contention_site_t* cs) {
cs->sampling_range = 0;
}
@@ -671,13 +669,13 @@ BUTIL_FORCE_INLINE int pthread_mutex_lock_impl(Mutex*
mutex) {
MutexAndContentionSite& entry = fast_alt.list[fast_alt.count++];
entry.mutex = mutex;
csite = &entry.csite;
- if (!sampling_range) {
+ if (!bvar::is_sampling_range_valid(sampling_range)) {
make_contention_site_invalid(&entry.csite);
return pthread_mutex_lock_internal(mutex);
}
}
#endif
- if (!sampling_range) { // don't sample
+ if (!bvar::is_sampling_range_valid(sampling_range)) { // don't sample
return pthread_mutex_lock_internal(mutex);
}
// Lock and monitor the waiting time.
@@ -873,13 +871,14 @@ void FastPthreadMutex::unlock() {
extern "C" {
int bthread_mutex_init(bthread_mutex_t* __restrict m,
- const bthread_mutexattr_t* __restrict) {
+ const bthread_mutexattr_t* __restrict attr) {
bthread::make_contention_site_invalid(&m->csite);
m->butex = bthread::butex_create_checked<unsigned>();
if (!m->butex) {
return ENOMEM;
}
*m->butex = 0;
+ m->enable_csite = NULL == attr ? true : attr->enable_csite;
return 0;
}
@@ -900,35 +899,9 @@ int bthread_mutex_lock_contended(bthread_mutex_t* m) {
return bthread::mutex_lock_contended_impl(m, NULL);
}
-int bthread_mutex_lock(bthread_mutex_t* m) {
- bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
- if (!split->locked.exchange(1, butil::memory_order_acquire)) {
- return 0;
- }
- // Don't sample when contention profiler is off.
- if (!bthread::g_cp) {
- return bthread::mutex_lock_contended_impl(m, NULL);
- }
- // Ask Collector if this (contended) locking should be sampled.
- const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
- if (!sampling_range) { // Don't sample
- return bthread::mutex_lock_contended_impl(m, NULL);
- }
- // Start sampling.
- const int64_t start_ns = butil::cpuwide_time_ns();
- // NOTE: Don't modify m->csite outside lock since multiple threads are
- // still contending with each other.
- const int rc = bthread::mutex_lock_contended_impl(m, NULL);
- if (!rc) { // Inside lock
- m->csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
- m->csite.sampling_range = sampling_range;
- } // else rare
- return rc;
-}
-
-int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
- const struct timespec* __restrict abstime) {
- bthread::MutexInternal* split = (bthread::MutexInternal*)m->butex;
+static int bthread_mutex_lock_impl(bthread_mutex_t* __restrict m,
+ const struct timespec* __restrict abstime) {
+ auto split = (bthread::MutexInternal*)m->butex;
if (!split->locked.exchange(1, butil::memory_order_acquire)) {
return 0;
}
@@ -937,8 +910,9 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Ask Collector if this (contended) locking should be sampled.
- const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
- if (!sampling_range) { // Don't sample
+ const size_t sampling_range =
+ m->enable_csite ? bvar::is_collectable(&bthread::g_cp_sl) :
bvar::INVALID_SAMPLING_RANGE;
+ if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample
return bthread::mutex_lock_contended_impl(m, abstime);
}
// Start sampling.
@@ -958,10 +932,20 @@ int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
return rc;
}
+int bthread_mutex_lock(bthread_mutex_t* m) {
+ return bthread_mutex_lock_impl(m, NULL);
+}
+
+int bthread_mutex_timedlock(bthread_mutex_t* __restrict m,
+ const struct timespec* __restrict abstime) {
+ return bthread_mutex_lock_impl(m, abstime);
+}
+
int bthread_mutex_unlock(bthread_mutex_t* m) {
- butil::atomic<unsigned>* whole = (butil::atomic<unsigned>*)m->butex;
+ auto whole = (butil::atomic<unsigned>*)m->butex;
bthread_contention_site_t saved_csite = {0, 0};
- if (bthread::is_contention_site_valid(m->csite)) {
+ bool is_valid = bthread::is_contention_site_valid(m->csite);
+ if (is_valid) {
saved_csite = m->csite;
bthread::make_contention_site_invalid(&m->csite);
}
@@ -971,7 +955,7 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}
// Wakeup one waiter
- if (!bthread::is_contention_site_valid(saved_csite)) {
+ if (!is_valid) {
bthread::butex_wake(whole);
return 0;
}
@@ -983,6 +967,21 @@ int bthread_mutex_unlock(bthread_mutex_t* m) {
return 0;
}
+int bthread_mutexattr_init(bthread_mutexattr_t* attr) {
+ attr->enable_csite = true;
+ return 0;
+}
+
+int bthread_mutexattr_disable_csite(bthread_mutexattr_t* attr) {
+ attr->enable_csite = false;
+ return 0;
+}
+
+int bthread_mutexattr_destroy(bthread_mutexattr_t* attr) {
+ attr->enable_csite = true;
+ return 0;
+}
+
#ifndef NO_PTHREAD_MUTEX_HOOK
int pthread_mutex_lock(pthread_mutex_t* __mutex) {
return bthread::pthread_mutex_lock_impl(__mutex);
diff --git a/src/bthread/mutex.h b/src/bthread/mutex.h
index ad6d2e5c..f1d1029b 100644
--- a/src/bthread/mutex.h
+++ b/src/bthread/mutex.h
@@ -28,7 +28,7 @@
__BEGIN_DECLS
extern int bthread_mutex_init(bthread_mutex_t* __restrict mutex,
- const bthread_mutexattr_t* __restrict
mutex_attr);
+ const bthread_mutexattr_t* __restrict attr);
extern int bthread_mutex_destroy(bthread_mutex_t* mutex);
extern int bthread_mutex_trylock(bthread_mutex_t* mutex);
extern int bthread_mutex_lock(bthread_mutex_t* mutex);
@@ -48,7 +48,8 @@ public:
Mutex() {
int ec = bthread_mutex_init(&_mutex, NULL);
if (ec != 0) {
- throw std::system_error(std::error_code(ec,
std::system_category()), "Mutex constructor failed");
+ throw std::system_error(std::error_code(ec,
std::system_category()),
+ "Mutex constructor failed");
}
}
~Mutex() { CHECK_EQ(0, bthread_mutex_destroy(&_mutex)); }
@@ -56,11 +57,12 @@ public:
void lock() {
int ec = bthread_mutex_lock(&_mutex);
if (ec != 0) {
- throw std::system_error(std::error_code(ec,
std::system_category()), "Mutex lock failed");
+ throw std::system_error(std::error_code(ec,
std::system_category()),
+ "Mutex lock failed");
}
}
- void unlock() { bthread_mutex_unlock(&_mutex); }
- bool try_lock() { return !bthread_mutex_trylock(&_mutex); }
+ void unlock() { (bthread_mutex_unlock(&_mutex)); }
+ bool try_lock() { return 0 == bthread_mutex_trylock(&_mutex); }
// TODO(chenzhangyi01): Complement interfaces for C++11
private:
DISALLOW_COPY_AND_ASSIGN(Mutex);
@@ -107,7 +109,7 @@ namespace std {
template <> class lock_guard<bthread_mutex_t> {
public:
- explicit lock_guard(bthread_mutex_t & mutex) : _pmutex(&mutex) {
+ explicit lock_guard(bthread_mutex_t& mutex) : _pmutex(&mutex) {
#if !defined(NDEBUG)
const int rc = bthread_mutex_lock(_pmutex);
if (rc) {
diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp
new file mode 100644
index 00000000..e6356683
--- /dev/null
+++ b/src/bthread/rwlock.cpp
@@ -0,0 +1,368 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bvar/collector.h"
+#include "bthread/rwlock.h"
+#include "bthread/butex.h"
+
+namespace bthread {
+
+// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
+// which is a bthread implementation of golang RWMutex.
+// The lock can be held by an arbitrary number of readers or a single writer.
+// For details, see
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
+
+// Define in bthread/mutex.cpp
+class ContentionProfiler;
+extern ContentionProfiler* g_cp;
+extern bvar::CollectorSpeedLimit g_cp_sl;
+extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
+extern void make_contention_site_invalid(bthread_contention_site_t* cs);
+extern void submit_contention(const bthread_contention_site_t& csite, int64_t
now_ns);
+
+// It is enough for readers. If the reader exceeds this value,
+// need to use `int64_t' instead of `int'.
+const int RWLockMaxReaders = 1 << 30;
+
+// For reading.
+static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict abstime) {
+ int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+ ->fetch_add(1, butil::memory_order_acquire) + 1;
+ // Fast path.
+ if (reader_count >= 0) {
+ CHECK_LT(reader_count, RWLockMaxReaders);
+ return 0;
+ }
+
+ // Slow path.
+
+ // Don't sample when contention profiler is off.
+ if (NULL == bthread::g_cp) {
+ return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+ }
+ // Ask Collector if this (contended) locking should be sampled.
+ const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
+ if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
+ return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+ }
+
+ // Sample.
+ const int64_t start_ns = butil::cpuwide_time_ns();
+ int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+ const int64_t end_ns = butil::cpuwide_time_ns();
+ const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
+ // Submit `csite' for each reader immediately after
+ // owning rdlock to avoid the contention of `csite'.
+ bthread::submit_contention(csite, end_ns);
+
+ return rc;
+}
+
+static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
+ return rwlock_rdlock_impl(rwlock, NULL);
+}
+
+static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict
abstime) {
+ return rwlock_rdlock_impl(rwlock, abstime);
+}
+
+// Returns 0 if the lock was acquired, otherwise errno.
+static inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
+ while (true) {
+ int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+ ->load(butil::memory_order_relaxed);
+ if (reader_count < 0) {
+ // Failed to acquire the read lock because there is a writer.
+ return EBUSY;
+ }
+ if (((butil::atomic<int>*)&rwlock->reader_count)
+ ->compare_exchange_weak(reader_count, reader_count + 1,
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ return 0;
+ }
+ }
+}
+
+static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
+ int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+ ->fetch_add(-1, butil::memory_order_relaxed) - 1;
+ // Fast path.
+ if (reader_count >= 0) {
+ return 0;
+ }
+ // Slow path.
+
+ if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 ==
-RWLockMaxReaders)) {
+ CHECK(false) << "rwlock_unrdlock of unlocked rwlock";
+ return EINVAL;
+ }
+
+ // A writer is pending.
+ int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait)
+ ->fetch_add(-1, butil::memory_order_relaxed) - 1;
+ if (reader_wait != 0) {
+ return 0;
+ }
+
+ // The last reader unblocks the writer.
+
+ if (NULL == bthread::g_cp) {
+ bthread_sem_post(&rwlock->writer_sema);
+ return 0;
+ }
+ // Ask Collector if this (contended) locking should be sampled.
+ const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
+ if (!sampling_range) { // Don't sample
+ bthread_sem_post(&rwlock->writer_sema);
+ return 0;
+ }
+
+ // Sampling.
+ const int64_t start_ns = butil::cpuwide_time_ns();
+ bthread_sem_post(&rwlock->writer_sema);
+ const int64_t end_ns = butil::cpuwide_time_ns();
+ const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
+ // Submit `csite' for each reader immediately after
+ // releasing rdlock to avoid the contention of `csite'.
+ bthread::submit_contention(csite, end_ns);
+ return 0;
+}
+
+#define DO_CSITE_IF_NEED
\
+ do {
\
+ /* Don't sample when contention profiler is off. */
\
+ if (NULL != bthread::g_cp) {
\
+ /* Ask Collector if this (contended) locking should be sampled. */
\
+ sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
\
+ start_ns = bvar::is_sampling_range_valid(sampling_range) ?
\
+ butil::cpuwide_time_ns() : -1;
\
+ } else {
\
+ start_ns = -1;
\
+ }
\
+ } while (0)
+
+#define SUBMIT_CSITE_IF_NEED
\
+ do {
\
+ if (ETIMEDOUT == rc && start_ns > 0) {
\
+ /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */
\
+ const int64_t end_ns = butil::cpuwide_time_ns();
\
+ const bthread_contention_site_t csite{end_ns - start_ns,
sampling_range}; \
+ bthread::submit_contention(csite, end_ns);
\
+ }
\
+ } while (0)
+
+// For writing.
+static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict
abstime) {
+ // First, resolve competition with other writers.
+ int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
+ size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+ // -1: don't sample.
+ // 0: default value.
+ // > 0: Start time of sampling.
+ int64_t start_ns = 0;
+ if (0 != rc) {
+ DO_CSITE_IF_NEED;
+
+ rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime);
+ if (0 != rc) {
+ SUBMIT_CSITE_IF_NEED;
+ return rc;
+ }
+ }
+
+ // Announce to readers there is a pending writer.
+ int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
+ ->fetch_add(-RWLockMaxReaders, butil::memory_order_release);
+ // Wait for active readers.
+ if (reader_count != 0 &&
+ ((butil::atomic<int>*)&rwlock->reader_wait)
+ ->fetch_add(reader_count) + reader_count != 0) {
+ rc = bthread_sem_trywait(&rwlock->writer_sema);
+ if (0 != rc) {
+ if (0 == start_ns) {
+ DO_CSITE_IF_NEED;
+ }
+
+ rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime);
+ if (0 != rc) {
+ SUBMIT_CSITE_IF_NEED;
+ bthread_mutex_unlock(&rwlock->write_queue_mutex);
+ return rc;
+ }
+ }
+ }
+ if (start_ns > 0) {
+ rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
+ rwlock->writer_csite.sampling_range = sampling_range;
+ }
+ rwlock->wlock_flag = true;
+ return 0;
+}
+#undef DO_CSITE_IF_NEED
+#undef SUBMIT_CSITE_IF_NEED
+
+static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) {
+ return rwlock_wrlock_impl(rwlock, NULL);
+}
+
+static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict
abstime) {
+ return rwlock_wrlock_impl(rwlock, abstime);
+}
+
+static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) {
+ int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
+ if (0 != rc) {
+ return rc;
+ }
+
+ int expected = 0;
+ if (!((butil::atomic<int>*)&rwlock->reader_count)
+ ->compare_exchange_strong(expected, -RWLockMaxReaders,
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ // Failed to acquire the write lock because there are active readers.
+ bthread_mutex_unlock(&rwlock->write_queue_mutex);
+ return EBUSY;
+ }
+ rwlock->wlock_flag = true;
+
+ return 0;
+}
+
+static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int
reader_count) {
+ bthread_sem_post_n(&rwlock->reader_sema, reader_count);
+ // Allow other writers to proceed.
+ bthread_mutex_unlock(&rwlock->write_queue_mutex);
+}
+
+static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
+ rwlock->wlock_flag = false;
+
+ // Announce to readers there is no active writer.
+ int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)->fetch_add(
+ RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders;
+ if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) {
+ CHECK(false) << "rwlock_unwlock of unlocked rwlock";
+ return EINVAL;
+ }
+
+ bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite);
+ if (BAIDU_UNLIKELY(is_valid)) {
+ bthread_contention_site_t saved_csite = rwlock->writer_csite;
+ bthread::make_contention_site_invalid(&rwlock->writer_csite);
+
+ const int64_t unlock_start_ns = butil::cpuwide_time_ns();
+ rwlock_unwrlock_slow(rwlock, reader_count);
+ const int64_t unlock_end_ns = butil::cpuwide_time_ns();
+ saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
+ bthread::submit_contention(saved_csite, unlock_end_ns);
+ } else {
+ rwlock_unwrlock_slow(rwlock, reader_count);
+ }
+
+ return 0;
+}
+
+static inline int rwlock_unlock(bthread_rwlock_t* rwlock) {
+ if (rwlock->wlock_flag) {
+ return rwlock_unwrlock(rwlock);
+ } else {
+ return rwlock_unrdlock(rwlock);
+ }
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
+ const bthread_rwlockattr_t* __restrict) {
+ int rc = bthread_sem_init(&rwlock->reader_sema, 0);
+ if (BAIDU_UNLIKELY(0 != rc)) {
+ return rc;
+ }
+ bthread_sem_disable_csite(&rwlock->reader_sema);
+ rc = bthread_sem_init(&rwlock->writer_sema, 0);
+ if (BAIDU_UNLIKELY(0 != rc)) {
+ bthread_sem_destroy(&rwlock->reader_sema);
+ return rc;
+ }
+ bthread_sem_disable_csite(&rwlock->writer_sema);
+
+ rwlock->reader_count = 0;
+ rwlock->reader_wait = 0;
+ rwlock->wlock_flag = false;
+
+ bthread_mutexattr_t attr;
+ bthread_mutexattr_init(&attr);
+ bthread_mutexattr_disable_csite(&attr);
+ rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
+ if (BAIDU_UNLIKELY(0 != rc)) {
+ bthread_sem_destroy(&rwlock->reader_sema);
+ bthread_sem_destroy(&rwlock->writer_sema);
+ return rc;
+ }
+ bthread_mutexattr_destroy(&attr);
+
+ bthread::make_contention_site_invalid(&rwlock->writer_csite);
+
+ return 0;
+}
+
+int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
+ bthread_sem_destroy(&rwlock->reader_sema);
+ bthread_sem_destroy(&rwlock->writer_sema);
+ bthread_mutex_destroy(&rwlock->write_queue_mutex);
+ return 0;
+}
+
+int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) {
+ return bthread::rwlock_rdlock(rwlock);
+}
+
+int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
+ return bthread::rwlock_tryrdlock(rwlock);
+}
+
+int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict abstime) {
+ return bthread::rwlock_timedrdlock(rwlock, abstime);
+}
+
+int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) {
+ return bthread::rwlock_wrlock(rwlock);
+}
+
+int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) {
+ return bthread::rwlock_trywrlock(rwlock);
+}
+
+int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
+ const struct timespec* __restrict abstime) {
+ return bthread::rwlock_timedwrlock(rwlock, abstime);
+}
+
+int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) {
+ return bthread::rwlock_unlock(rwlock);
+}
+
+__END_DECLS
diff --git a/src/bthread/rwlock.h b/src/bthread/rwlock.h
new file mode 100644
index 00000000..a2708b99
--- /dev/null
+++ b/src/bthread/rwlock.h
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bthread/types.h"
+#include "bthread/bthread.h"
+#include "butil/scoped_lock.h"
+
+namespace bthread {
+
+// The C++ Wrapper of bthread_rwlock
+
+// NOTE: Not aligned to cacheline as the container of RWLock is practically
aligned.
+
+class RWLock {
+public:
+ typedef bthread_rwlock_t* native_handler_type;
+
+ RWLock() {
+ int rc = bthread_rwlock_init(&_rwlock, NULL);
+ if (rc) {
+ throw std::system_error(std::error_code(rc,
std::system_category()),
+ "RWLock constructor failed");
+ }
+ }
+
+ ~RWLock() {
+ CHECK_EQ(0, bthread_rwlock_destroy(&_rwlock));
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(RWLock);
+
+ native_handler_type native_handler() { return &_rwlock; }
+
+ void rdlock() {
+ int rc = bthread_rwlock_rdlock(&_rwlock);
+ if (rc) {
+ throw std::system_error(std::error_code(rc,
std::system_category()),
+ "RWLock rdlock failed");
+ }
+ }
+
+ bool try_rdlock() {
+ return 0 == bthread_rwlock_tryrdlock(&_rwlock);
+ }
+
+ bool timed_rdlock(const struct timespec* abstime) {
+ return 0 == bthread_rwlock_timedrdlock(&_rwlock, abstime);
+ }
+
+ void wrlock() {
+ int rc = bthread_rwlock_wrlock(&_rwlock);
+ if (rc) {
+ throw std::system_error(std::error_code(rc,
std::system_category()),
+ "RWLock wrlock failed");
+ }
+ }
+
+ bool try_wrlock() {
+ return 0 == bthread_rwlock_trywrlock(&_rwlock);
+ }
+
+ bool timed_wrlock(const struct timespec* abstime) {
+ return 0 == bthread_rwlock_timedwrlock(&_rwlock, abstime);
+ }
+
+ void unlock() { bthread_rwlock_unlock(&_rwlock); }
+
+private:
+ bthread_rwlock_t _rwlock{};
+};
+
+// Read lock guard of rwlock.
+class RWLockRdGuard {
+public:
+ explicit RWLockRdGuard(bthread_rwlock_t& rwlock)
+ : _rwlock(&rwlock) {
+#if !defined(NDEBUG)
+ const int rc = bthread_rwlock_rdlock(_rwlock);
+ if (rc) {
+ LOG(FATAL) << "Fail to rdlock bthread_rwlock_t=" << _rwlock << ",
" << berror(rc);
+ _rwlock = NULL;
+ }
+#else
+ bthread_rwlock_rdlock(_rwlock);
+#endif // NDEBUG
+ }
+
+ explicit RWLockRdGuard(RWLock& rwlock)
+ : RWLockRdGuard(*rwlock.native_handler()) {}
+
+ ~RWLockRdGuard() {
+#ifndef NDEBUG
+ if (NULL != _rwlock) {
+ bthread_rwlock_unlock(_rwlock);
+ }
+#else
+ bthread_rwlock_unlock(_rwlock);
+#endif // NDEBUG
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(RWLockRdGuard);
+
+private:
+ bthread_rwlock_t* _rwlock;
+};
+
+// Write lock guard of rwlock.
+class RWLockWrGuard {
+public:
+ explicit RWLockWrGuard(bthread_rwlock_t& rwlock)
+ : _rwlock(&rwlock) {
+#if !defined(NDEBUG)
+ const int rc = bthread_rwlock_wrlock(_rwlock);
+ if (rc) {
+ LOG(FATAL) << "Fail to wrlock bthread_rwlock_t=" << _rwlock << ",
" << berror(rc);
+ _rwlock = NULL;
+ }
+#else
+ bthread_rwlock_wrlock(_rwlock);
+#endif // NDEBUG
+ }
+
+ explicit RWLockWrGuard(RWLock& rwlock)
+ : RWLockWrGuard(*rwlock.native_handler()) {}
+
+ ~RWLockWrGuard() {
+#ifndef NDEBUG
+ if (NULL != _rwlock) {
+ bthread_rwlock_unlock(_rwlock);
+ }
+#else
+ bthread_rwlock_unlock(_rwlock);
+#endif // NDEBUG
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(RWLockWrGuard);
+
+private:
+ bthread_rwlock_t* _rwlock;
+};
+
+} // namespace bthread
+
+namespace std {
+
+template <>
+class lock_guard<bthread_rwlock_t> {
+public:
+ lock_guard(bthread_rwlock_t& rwlock, bool read)
+ :_rwlock(&rwlock), _read(read) {
+#if !defined(NDEBUG)
+ int rc;
+ if (_read) {
+ rc = bthread_rwlock_rdlock(_rwlock);
+ } else {
+ rc = bthread_rwlock_wrlock(_rwlock);
+ }
+ if (rc) {
+ LOG(FATAL) << "Fail to lock bthread_rwlock_t=" << _rwlock << ", "
<< berror(rc);
+ _rwlock = NULL;
+ }
+#else
+ if (_read) {
+ bthread_rwlock_rdlock(_rwlock);
+ } else {
+ bthread_rwlock_wrlock(_rwlock);
+ }
+#endif // NDEBUG
+ }
+
+ ~lock_guard() {
+#ifndef NDEBUG
+ if (NULL != _rwlock) {
+ bthread_rwlock_unlock(_rwlock);
+ }
+#else
+ bthread_rwlock_unlock(_rwlock);
+#endif // NDEBUG
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(lock_guard);
+
+private:
+ bthread_rwlock_t* _rwlock;
+ bool _read;
+};
+
+template <>
+class lock_guard<bthread::RWLock> {
+public:
+ lock_guard(bthread::RWLock& rwlock, bool read)
+ :_rwlock_guard(*rwlock.native_handler(), read) {}
+
+ DISALLOW_COPY_AND_ASSIGN(lock_guard);
+
+private:
+ std::lock_guard<bthread_rwlock_t> _rwlock_guard;
+};
+
+} // namespace std
diff --git a/src/bthread/semaphore.cpp b/src/bthread/semaphore.cpp
new file mode 100644
index 00000000..3813a8a6
--- /dev/null
+++ b/src/bthread/semaphore.cpp
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "butil/memory/scope_guard.h"
+#include "bvar/collector.h"
+#include "bthread/bthread.h"
+#include "bthread/butex.h"
+
+namespace bthread {
+
+// Define in bthread/mutex.cpp
+class ContentionProfiler;
+extern ContentionProfiler* g_cp;
+extern bvar::CollectorSpeedLimit g_cp_sl;
+extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
+extern void make_contention_site_invalid(bthread_contention_site_t* cs);
+extern void submit_contention(const bthread_contention_site_t& csite, int64_t
now_ns);
+
+static inline int bthread_sem_trywait(bthread_sem_t* sema) {
+ auto whole = (butil::atomic<unsigned>*)sema->butex;
+ while (true) {
+ unsigned num = whole->load(butil::memory_order_relaxed);
+ if (num == 0) {
+ return EAGAIN;
+ }
+ if (whole->compare_exchange_weak(num, num - 1,
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ return 0;
+ }
+ }
+
+}
+
+static int bthread_sem_wait_impl(bthread_sem_t* sem, const struct timespec*
abstime) {
+ bool queue_lifo = false;
+ bool first_wait = true;
+ size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+ // -1: don't sample.
+ // 0: default value.
+ // > 0: Start time of sampling.
+ int64_t start_ns = 0;
+ auto whole = (butil::atomic<unsigned>*)sem->butex;
+ while (true) {
+ unsigned num = whole->load(butil::memory_order_relaxed);
+ if (num > 0) {
+ if (whole->compare_exchange_weak(num, num - 1,
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ if (start_ns > 0) {
+ const int64_t end_ns = butil::cpuwide_time_ns();
+ const bthread_contention_site_t csite{end_ns - start_ns,
sampling_range};
+ bthread::submit_contention(csite, end_ns);
+ }
+
+ return 0;
+ }
+ }
+ // Don't sample when contention profiler is off.
+ if (NULL != bthread::g_cp && start_ns == 0 && sem->enable_csite &&
+ !bvar::is_sampling_range_valid(sampling_range)) {
+ // Ask Collector if this (contended) sem waiting should be sampled.
+ sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
+ start_ns = bvar::is_sampling_range_valid(sampling_range) ?
+ butil::cpuwide_time_ns() : -1;
+ } else {
+ start_ns = -1;
+ }
+ if (bthread::butex_wait(sem->butex, 0, abstime, queue_lifo) < 0 &&
+ errno != EWOULDBLOCK && errno != EINTR) {
+ // A sema should ignore interruptions in general since
+ // user code is unlikely to check the return value.
+ if (ETIMEDOUT == errno && start_ns > 0) {
+ // Failed to lock due to ETIMEDOUT, submit the elapse directly.
+ const int64_t end_ns = butil::cpuwide_time_ns();
+ const bthread_contention_site_t csite{end_ns - start_ns,
sampling_range};
+ bthread::submit_contention(csite, end_ns);
+ }
+
+ return errno;
+ }
+ // Ignore EWOULDBLOCK and EINTR.
+ if (first_wait && 0 == errno) {
+ first_wait = false;
+ }
+ if (!first_wait) {
+ // Normally, bthreads are queued in FIFO order. But competing with
new
+ // arriving bthreads over sema, a woken up bthread has good
chances of
+ // losing. Because new arriving bthreads are already running on
CPU and
+ // there can be lots of them. In such case, for fairness, to avoid
+ // starvation, it is queued at the head of the waiter queue.
+ queue_lifo = true;
+ }
+ }
+}
+
+static inline int bthread_sem_post(bthread_sem_t* sem, size_t num) {
+ if (num > 0) {
+ unsigned n = ((butil::atomic<unsigned>*)sem->butex)
+ ->fetch_add(num, butil::memory_order_relaxed);
+ const size_t sampling_range = NULL != bthread::g_cp &&
sem->enable_csite ?
+ bvar::is_collectable(&bthread::g_cp_sl) :
bvar::INVALID_SAMPLING_RANGE;
+ const int64_t start_ns = bvar::is_sampling_range_valid(sampling_range)
?
+ butil::cpuwide_time_ns() : -1;
+ bthread::butex_wake_n(sem->butex, n);
+ if (start_ns > 0) {
+ const int64_t end_ns = butil::cpuwide_time_ns();
+ const bthread_contention_site_t csite{end_ns - start_ns,
sampling_range};
+ bthread::submit_contention(csite, end_ns);
+ }
+ }
+ return 0;
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_sem_init(bthread_sem_t* sem, unsigned value) {
+ sem->butex = bthread::butex_create_checked<unsigned>();
+ if (!sem->butex) {
+ return ENOMEM;
+ }
+ *sem->butex = value;
+ sem->enable_csite = true;
+ return 0;
+}
+
+int bthread_sem_disable_csite(bthread_sem_t* sema) {
+ sema->enable_csite = false;
+ return 0;
+}
+
+int bthread_sem_destroy(bthread_sem_t* semaphore) {
+ bthread::butex_destroy(semaphore->butex);
+ return 0;
+}
+
+int bthread_sem_trywait(bthread_sem_t* sem) {
+ return bthread::bthread_sem_trywait(sem);
+}
+
+int bthread_sem_wait(bthread_sem_t* sem) {
+ return bthread::bthread_sem_wait_impl(sem, NULL);
+}
+
+int bthread_sem_timedwait(bthread_sem_t* sem, const struct timespec* abstime) {
+ return bthread::bthread_sem_wait_impl(sem, abstime);
+}
+
+int bthread_sem_post(bthread_sem_t* sem) {
+ return bthread::bthread_sem_post(sem, 1);
+}
+
+int bthread_sem_post_n(bthread_sem_t* sem, size_t n) {
+ return bthread::bthread_sem_post(sem, n);
+}
+
+__END_DECLS
\ No newline at end of file
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 0aad64c4..d177ea72 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -172,9 +172,11 @@ typedef struct bthread_mutex_t {
#endif
unsigned* butex;
bthread_contention_site_t csite;
+ bool enable_csite;
} bthread_mutex_t;
typedef struct {
+ bool enable_csite;
} bthread_mutexattr_t;
typedef struct bthread_cond_t {
@@ -190,6 +192,18 @@ typedef struct {
} bthread_condattr_t;
typedef struct {
+ unsigned* butex;
+ bool enable_csite;
+} bthread_sem_t;
+
+typedef struct {
+ bthread_sem_t reader_sema; // Semaphore for readers to wait for completing
writers.
+ bthread_sem_t writer_sema; // Semaphore for writers to wait for completing
readers.
+ int reader_count; // Number of pending readers.
+ int reader_wait; // Number of departing readers.
+ bool wlock_flag; // Flag used to indicate that a write lock has been hold.
+ bthread_mutex_t write_queue_mutex; // Held if there are pending writers.
+ bthread_contention_site_t writer_csite;
} bthread_rwlock_t;
typedef struct {
diff --git a/src/bvar/collector.h b/src/bvar/collector.h
index 56db7214..a603d96b 100644
--- a/src/bvar/collector.h
+++ b/src/bvar/collector.h
@@ -28,6 +28,12 @@
namespace bvar {
+static const size_t INVALID_SAMPLING_RANGE = 0;
+
+inline bool is_sampling_range_valid(size_t sampling_range) {
+ return sampling_range > 0;
+}
+
// Containing the context for limiting sampling speed.
struct CollectorSpeedLimit {
// [Managed by Collector, don't change!]
@@ -115,7 +121,7 @@ inline size_t is_collectable(CollectorSpeedLimit*
speed_limit) {
const size_t sampling_range = speed_limit->sampling_range;
// fast_rand is faster than fast_rand_in
if ((butil::fast_rand() & (COLLECTOR_SAMPLING_BASE - 1)) >=
sampling_range) {
- return 0;
+ return INVALID_SAMPLING_RANGE;
}
return sampling_range;
}
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index d9af2ae7..b8c3b3d2 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -226,6 +226,8 @@ cc_test(
# glog CHECK die with a fatal error
"bthread_key_unittest.cpp",
"bthread_butex_multi_tag_unittest.cpp",
+ "bthread_rwlock_unittest.cpp",
+ "bthread_semaphore_unittest.cpp",
],
),
copts = COPTS,
diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp
index 60cbfbe2..3318956b 100644
--- a/test/bthread_rwlock_unittest.cpp
+++ b/test/bthread_rwlock_unittest.cpp
@@ -15,15 +15,394 @@
// specific language governing permissions and limitations
// under the License.
-#include <stdlib.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <signal.h>
#include <gtest/gtest.h>
-#include "butil/time.h"
-#include "butil/macros.h"
+#include <butil/gperftools_profiler.h>
+#include <bthread/rwlock.h>
namespace {
+
+long start_time = butil::cpuwide_time_ms();
+int c = 0;
+void* rdlocker(void* arg) {
+ auto rw = (bthread_rwlock_t*)arg;
+ bthread_rwlock_rdlock(rw);
+ LOG(INFO) <<butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %"
PRId64 "ms\n",
+ pthread_numeric_id(), ++c,
+ butil::cpuwide_time_ms() - start_time);
+ bthread_usleep(10000);
+ bthread_rwlock_unlock(rw);
+ return NULL;
+}
+
+void* wrlocker(void* arg) {
+ auto rw = (bthread_rwlock_t*)arg;
+ bthread_rwlock_wrlock(rw);
+ LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm wrlocker, %d, %"
PRId64 "ms\n",
+ pthread_numeric_id(), ++c,
+ butil::cpuwide_time_ms() - start_time);
+ bthread_usleep(10000);
+ bthread_rwlock_unlock(rw);
+ return NULL;
+}
+
+TEST(RWLockTest, sanity) {
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+ ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ bthread_t rdth;
+ bthread_t rwth;
+ ASSERT_EQ(0, bthread_start_urgent(&rdth, NULL, rdlocker, &rw));
+ ASSERT_EQ(0, bthread_start_urgent(&rwth, NULL, wrlocker, &rw));
+
+ ASSERT_EQ(0, bthread_join(rdth, NULL));
+ ASSERT_EQ(0, bthread_join(rwth, NULL));
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+TEST(RWLockTest, used_in_pthread) {
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+ pthread_t rdth[8];
+ pthread_t wrth[8];
+ for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) {
+ ASSERT_EQ(0, pthread_create(&rdth[i], NULL, rdlocker, &rw));
+ }
+ for (size_t i = 0; i < ARRAY_SIZE(wrth); ++i) {
+ ASSERT_EQ(0, pthread_create(&wrth[i], NULL, wrlocker, &rw));
+ }
+
+ for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) {
+ pthread_join(rdth[i], NULL);
+ }
+ for (size_t i = 0; i < ARRAY_SIZE(rdth); ++i) {
+ pthread_join(wrth[i], NULL);
+ }
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* do_timedrdlock(void *arg) {
+ struct timespec t = { -2, 0 };
+ EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedrdlock((bthread_rwlock_t*)arg,
&t));
+ return NULL;
+}
+
+void* do_timedwrlock(void *arg) {
+ struct timespec t = { -2, 0 };
+ EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock((bthread_rwlock_t*)arg,
&t));
+ LOG(INFO) << 10;
+ return NULL;
+}
+
+TEST(RWLockTest, timedlock) {
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+ bthread_t th;
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwrlock, &rw));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwrlock, &rw));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedrdlock, &rw));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct TrylockArgs {
+ bthread_rwlock_t* rw;
+ int rc;
+};
+
+void* do_tryrdlock(void *arg) {
+ auto trylock_args = (TrylockArgs*)arg;
+ EXPECT_EQ(trylock_args->rc, bthread_rwlock_tryrdlock(trylock_args->rw));
+ if (0 != trylock_args->rc) {
+ return NULL;
+ }
+ EXPECT_EQ(trylock_args->rc, bthread_rwlock_unlock(trylock_args->rw));
+ return NULL;
+}
+
+void* do_trywrlock(void *arg) {
+ auto trylock_args = (TrylockArgs*)arg;
+ EXPECT_EQ(trylock_args->rc, bthread_rwlock_trywrlock(trylock_args->rw));
+ if (0 != trylock_args->rc) {
+ return NULL;
+ }
+ EXPECT_EQ(trylock_args->rc, bthread_rwlock_unlock(trylock_args->rw));
+ return NULL;
+}
+
+TEST(RWLockTest, trylock) {
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ ASSERT_EQ(0, bthread_rwlock_tryrdlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+ bthread_t th;
+ TrylockArgs args{&rw, 0};
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_tryrdlock, &args));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ args.rc = EBUSY;
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywrlock, &args));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ ASSERT_EQ(0, bthread_rwlock_trywrlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+ ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_tryrdlock, &args));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywrlock, &args));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+TEST(RWLockTest, cpp_wrapper) {
+ bthread::RWLock rw;
+ ASSERT_TRUE(rw.try_rdlock());
+ rw.unlock();
+ rw.rdlock();
+ rw.unlock();
+ ASSERT_TRUE(rw.try_wrlock());
+ rw.unlock();
+ rw.wrlock();
+ rw.unlock();
+
+ struct timespec t = { -2, 0 };
+ ASSERT_TRUE(rw.timed_rdlock(&t));
+ rw.unlock();
+ ASSERT_TRUE(rw.timed_wrlock(&t));
+ rw.unlock();
+
+ {
+ bthread::RWLockRdGuard guard(rw);
+ }
+ {
+ bthread::RWLockWrGuard guard(rw);
+ }
+ {
+ std::lock_guard<bthread::RWLock> guard(rw, true);
+ }
+ {
+ std::lock_guard<bthread::RWLock> guard(rw, false);
+ }
+ {
+ std::lock_guard<bthread_rwlock_t> guard(*rw.native_handler(), true);
+ }
+ {
+ std::lock_guard<bthread_rwlock_t> guard(*rw.native_handler(), false);
+ }
+}
+
+bool g_started = false;
+bool g_stopped = false;
+
+void read_op(bthread_rwlock_t* rw, int64_t sleep_us) {
+ ASSERT_EQ(0, bthread_rwlock_rdlock(rw));
+ if (0 != sleep_us) {
+ bthread_usleep(sleep_us);
+ }
+ ASSERT_EQ(0, bthread_rwlock_unlock(rw));
+}
+
+void write_op(bthread_rwlock_t* rw, int64_t sleep_us) {
+ ASSERT_EQ(0, bthread_rwlock_wrlock(rw));
+ if (0 != sleep_us) {
+ bthread_usleep(sleep_us);
+ }
+ ASSERT_EQ(0, bthread_rwlock_unlock(rw));
+}
+
+typedef void (*OP)(bthread_rwlock_t* rw, int64_t sleep_us);
+
+struct MixThreadArg {
+ bthread_rwlock_t* rw;
+ OP op;
+};
+
+void* loop_until_stopped(void* arg) {
+ auto args = (MixThreadArg*)arg;
+ while (!g_stopped) {
+ args->op(args->rw, 20);
+ }
+ return NULL;
+}
+
+TEST(RWLockTest, mix_thread_types) {
+ g_stopped = false;
+ bthread_rwlock_t rw;
+ ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+ const int N = 16;
+ const int M = N * 2;
+ pthread_t pthreads[N];
+ bthread_t bthreads[M];
+ // reserve enough workers for test. This is a must since we have
+ // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the
+ // bhtread_usleep below can't be scheduled and g_stopped is never
+ // true, thus loop_until_stopped spins forever)
+ bthread_setconcurrency(M);
+ std::vector<MixThreadArg> args;
+ args.reserve(N + M);
+ for (int i = 0; i < N; ++i) {
+ if (i % 2 == 0) {
+ args.push_back({&rw, read_op});
+ } else {
+ args.push_back({&rw, write_op});
+ }
+ ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped,
&args.back()));
+ }
+
+ for (int i = 0; i < M; ++i) {
+ if (i % 2 == 0) {
+ args.push_back({&rw, read_op});
+ } else {
+ args.push_back({&rw, write_op});
+ }
+ const bthread_attr_t* attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
+ ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr,
loop_until_stopped, &args.back()));
+ }
+ bthread_usleep(1000L * 1000);
+ g_stopped = true;
+ for (int i = 0; i < M; ++i) {
+ bthread_join(bthreads[i], NULL);
+ }
+ for (int i = 0; i < N; ++i) {
+ pthread_join(pthreads[i], NULL);
+ }
+
+ ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
+ bthread_rwlock_t* rw;
+ int64_t counter;
+ int64_t elapse_ns;
+ bool ready;
+
+ PerfArgs() : rw(NULL), counter(0), elapse_ns(0), ready(false) {}
+};
+
+template <bool Reader>
+void* add_with_mutex(void* void_arg) {
+ auto args = (PerfArgs*)void_arg;
+ args->ready = true;
+ butil::Timer t;
+ while (!g_stopped) {
+ if (g_started) {
+ break;
+ }
+ bthread_usleep(10);
+ }
+ t.start();
+ while (!g_stopped) {
+ if (Reader) {
+ bthread_rwlock_rdlock(args->rw);
+ } else {
+ bthread_rwlock_wrlock(args->rw);
+ }
+ ++args->counter;
+ bthread_rwlock_unlock(args->rw);
+ }
+ t.stop();
+ args->elapse_ns = t.n_elapsed();
+ return NULL;
+}
+
+int g_prof_name_counter = 0;
+
+template <typename ThreadId, typename ThreadCreateFn, typename ThreadJoinFn>
+void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/, int thread_num,
+ const ThreadCreateFn& create_fn, const ThreadJoinFn& join_fn) {
+ ASSERT_LE(writer_ratio, 100U);
+
+ g_started = false;
+ g_stopped = false;
+ bthread_setconcurrency(thread_num + 4);
+ std::vector<ThreadId> threads(thread_num);
+ std::vector<PerfArgs> args(thread_num);
+ bthread_rwlock_t rw;
+ bthread_rwlock_init(&rw, NULL);
+ int writer_num = thread_num * writer_ratio / 100;
+ int reader_num = thread_num - writer_num;
+ for (int i = 0; i < thread_num; ++i) {
+ args[i].rw = &rw;
+ if (i < writer_num) {
+ ASSERT_EQ(0, create_fn(&threads[i], NULL, add_with_mutex<false>,
&args[i]));
+ } else {
+ ASSERT_EQ(0, create_fn(&threads[i], NULL, add_with_mutex<true>,
&args[i]));
+ }
+ }
+ while (true) {
+ bool all_ready = true;
+ for (int i = 0; i < thread_num; ++i) {
+ if (!args[i].ready) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ break;
+ }
+ usleep(1000);
+ }
+ g_started = true;
+ char prof_name[32];
+ snprintf(prof_name, sizeof(prof_name), "bthread_rwlock_perf_%d.prof",
++g_prof_name_counter);
+ ProfilerStart(prof_name);
+ usleep(1000 * 1000);
+ ProfilerStop();
+ g_stopped = true;
+
+ int64_t read_wait_time = 0;
+ int64_t read_count = 0;
+ int64_t write_wait_time = 0;
+ int64_t write_count = 0;
+ for (int i = 0; i < thread_num; ++i) {
+ ASSERT_EQ(0, join_fn(threads[i], NULL));
+ if (i < writer_num) {
+ write_wait_time += args[i].elapse_ns;
+ write_count += args[i].counter;
+ } else {
+ read_wait_time += args[i].elapse_ns;
+ read_count += args[i].counter;
+ }
+ }
+ LOG(INFO) << "bthread rwlock in "
+ << ((void*)create_fn == (void*)pthread_create ? "pthread" :
"bthread")
+ << " thread_num=" << thread_num
+ << " writer_ratio=" << writer_ratio
+ << " reader_num=" << reader_num
+ << " read_count=" << read_count
+ << " read_average_time=" << (read_count == 0 ? 0 :
read_wait_time / (double)read_count)
+ << " writer_num=" << writer_num
+ << " write_count=" << write_count
+ << " write_average_time=" << (write_count == 0 ? 0 :
write_wait_time / (double)write_count);
+}
+
+TEST(RWLockTest, performance) {
+ const int thread_num = 12;
+ PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
+ PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background,
bthread_join);
+ PerfTest(10, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
+ PerfTest(20, (bthread_t*)NULL, thread_num, bthread_start_background,
bthread_join);
+ PerfTest(100, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
+ PerfTest(100, (bthread_t*)NULL, thread_num, bthread_start_background,
bthread_join);
+}
+
+
void* read_thread(void* arg) {
const size_t N = 10000;
#ifdef CHECK_RWLOCK
@@ -49,7 +428,7 @@ void* write_thread(void*) {
return NULL;
}
-TEST(RWLockTest, rdlock_performance) {
+TEST(RWLockTest, pthread_rdlock_performance) {
#ifdef CHECK_RWLOCK
pthread_rwlock_t lock1;
ASSERT_EQ(0, pthread_rwlock_init(&lock1, NULL));
diff --git a/test/bthread_semaphore_unittest.cpp
b/test/bthread_semaphore_unittest.cpp
new file mode 100644
index 00000000..cc598a4c
--- /dev/null
+++ b/test/bthread_semaphore_unittest.cpp
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+
+namespace {
+
+const size_t SEM_COUNT = 10000;
+
+void* sem_waiter(void* arg) {
+ bthread_usleep(10 * 1000);
+ auto sem = (bthread_sem_t*)arg;
+ for (size_t i = 0; i < SEM_COUNT; ++i) {
+ bthread_sem_wait(sem);
+ }
+ return NULL;
+}
+
+void* sem_poster(void* arg) {
+ bthread_usleep(10 * 1000);
+ auto sem = (bthread_sem_t*)arg;
+ for (size_t i = 0; i < SEM_COUNT; ++i) {
+ bthread_sem_post(sem);
+ }
+ return NULL;
+}
+
+TEST(SemaphoreTest, sanity) {
+ bthread_sem_t sem;
+ ASSERT_EQ(0, bthread_sem_init(&sem, 1));
+ ASSERT_EQ(0, bthread_sem_wait(&sem));
+ ASSERT_EQ(0, bthread_sem_post(&sem));
+ ASSERT_EQ(0, bthread_sem_wait(&sem));
+
+ bthread_t waiter_th;
+ bthread_t poster_th;
+ ASSERT_EQ(0, bthread_start_urgent(&waiter_th, NULL, sem_waiter, &sem));
+ ASSERT_EQ(0, bthread_start_urgent(&poster_th, NULL, sem_poster, &sem));
+ ASSERT_EQ(0, bthread_join(waiter_th, NULL));
+ ASSERT_EQ(0, bthread_join(poster_th, NULL));
+
+ ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+
+
+TEST(SemaphoreTest, used_in_pthread) {
+ bthread_sem_t sem;
+ ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+
+ pthread_t waiter_th[8];
+ pthread_t poster_th[8];
+ for (auto& th : waiter_th) {
+ ASSERT_EQ(0, pthread_create(&th, NULL, sem_waiter, &sem));
+ }
+ for (auto& th : poster_th) {
+ ASSERT_EQ(0, pthread_create(&th, NULL, sem_poster, &sem));
+ }
+ for (auto& th : waiter_th) {
+ pthread_join(th, NULL);
+ }
+ for (auto& th : poster_th) {
+ pthread_join(th, NULL);
+ }
+
+ ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+void* do_timedwait(void *arg) {
+ struct timespec t = { -2, 0 };
+ EXPECT_EQ(ETIMEDOUT, bthread_sem_timedwait((bthread_sem_t*)arg, &t));
+ return NULL;
+}
+
+TEST(SemaphoreTest, timedwait) {
+ bthread_sem_t sem;
+ ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+ bthread_t th;
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_timedwait, &sem));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+
+struct TryWaitArgs {
+ bthread_sem_t* sem;
+ int rc;
+};
+
+void* do_trywait(void *arg) {
+ auto trylock_args = (TryWaitArgs*)arg;
+ EXPECT_EQ(trylock_args->rc, bthread_sem_trywait(trylock_args->sem));
+ return NULL;
+}
+
+TEST(SemaphoreTest, trywait) {
+ bthread_sem_t sem;
+ ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+
+ ASSERT_EQ(EAGAIN, bthread_sem_trywait(&sem));
+ ASSERT_EQ(0, bthread_sem_post(&sem));
+ ASSERT_EQ(0, bthread_sem_trywait(&sem));
+ ASSERT_EQ(EAGAIN, bthread_sem_trywait(&sem));
+
+ ASSERT_EQ(0, bthread_sem_post(&sem));
+ bthread_t th;
+ TryWaitArgs args{ &sem, 0};
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywait, &args));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+ args.rc = EAGAIN;
+ ASSERT_EQ(0, bthread_start_urgent(&th, NULL, do_trywait, &args));
+ ASSERT_EQ(0, bthread_join(th, NULL));
+
+ ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+bool g_started = false;
+bool g_stopped = false;
+
+void wait_op(bthread_sem_t* sem, int64_t sleep_us) {
+ ASSERT_EQ(0, bthread_sem_wait(sem));
+ if (0 != sleep_us) {
+ bthread_usleep(sleep_us);
+ }
+}
+
+void post_op(bthread_sem_t* rw, int64_t sleep_us) {
+ ASSERT_EQ(0, bthread_sem_post(rw));
+ if (0 != sleep_us) {
+ bthread_usleep(sleep_us);
+ }
+}
+
+typedef void (*OP)(bthread_sem_t* sem, int64_t sleep_us);
+
+struct MixThreadArg {
+ bthread_sem_t* sem;
+ OP op;
+};
+
+void* loop_until_stopped(void* arg) {
+ auto args = (MixThreadArg*)arg;
+ for (size_t i = 0; i < SEM_COUNT; ++i) {
+ args->op(args->sem, 20);
+ }
+ return NULL;
+}
+
+TEST(SemaphoreTest, mix_thread_types) {
+ g_stopped = false;
+ bthread_sem_t sem;
+ ASSERT_EQ(0, bthread_sem_init(&sem, 0));
+
+ const int N = 16;
+ const int M = N * 2;
+ pthread_t pthreads[N];
+ bthread_t bthreads[M];
+ // reserve enough workers for test. This is a must since we have
+ // BTHREAD_ATTR_PTHREAD bthreads which may cause deadlocks (the
+ // bhtread_usleep below can't be scheduled and g_stopped is never
+ // true, thus loop_until_stopped spins forever)
+ bthread_setconcurrency(M);
+ std::vector<MixThreadArg> args;
+ args.reserve(N + M);
+ for (int i = 0; i < N; ++i) {
+ if (i % 2 == 0) {
+ args.push_back({ &sem, wait_op });
+ } else {
+ args.push_back({ &sem, post_op });
+ }
+ ASSERT_EQ(0, pthread_create(&pthreads[i], NULL, loop_until_stopped,
&args.back()));
+ }
+
+ for (int i = 0; i < M; ++i) {
+ if (i % 2 == 0) {
+ args.push_back({ &sem, wait_op });
+ } else {
+ args.push_back({ &sem, post_op });
+ }
+ const bthread_attr_t* attr = i % 2 ? NULL : &BTHREAD_ATTR_PTHREAD;
+ ASSERT_EQ(0, bthread_start_urgent(&bthreads[i], attr,
loop_until_stopped, &args.back()));
+ }
+ for (bthread_t bthread : bthreads) {
+ bthread_join(bthread, NULL);
+ }
+ for (pthread_t pthread : pthreads) {
+ pthread_join(pthread, NULL);
+ }
+
+ ASSERT_EQ(0, bthread_sem_destroy(&sem));
+}
+
+} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]