This is an automated email from the ASF dual-hosted git repository.

chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new f85f1c57 Reimplement writer-priority RWLock (#3286)
f85f1c57 is described below

commit f85f1c57041f20256ec7dcd5241ad5568391ac99
Author: Bright Chen <[email protected]>
AuthorDate: Tue May 19 16:45:22 2026 +0800

    Reimplement writer-priority RWLock (#3286)
    
    The previous go-like RWLock bumps the reader count first and rolls it
    back only on the full success path. The state is not reversible on a
    partial failure, so a read timeout while a writer holds the lock leaves a
    dangling reader credit and permanently blocks future writers (issue #3051).
    For the same reason the old implementation could not offer correct 
try_/timed_ APIs
    at all.
    
    Co-authored-by: hairet
---
 src/bthread/rwlock.cpp           | 666 ++++++++++++++++++++++++---------------
 src/bthread/types.h              |  26 +-
 test/bthread_rwlock_unittest.cpp | 253 ++++++++++++++-
 3 files changed, 681 insertions(+), 264 deletions(-)

diff --git a/src/bthread/rwlock.cpp b/src/bthread/rwlock.cpp
index e6356683..e28f5ccb 100644
--- a/src/bthread/rwlock.cpp
+++ b/src/bthread/rwlock.cpp
@@ -15,350 +15,508 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <memory>
 #include "bvar/collector.h"
+#include "butil/memory/scope_guard.h"
 #include "bthread/rwlock.h"
+#include "bthread/mutex.h"
 #include "bthread/butex.h"
 
 namespace bthread {
 
-// A `bthread_rwlock_t' is a reader/writer mutual exclusion lock,
-// which is a bthread implementation of golang RWMutex.
-// The lock can be held by an arbitrary number of readers or a single writer.
-// For details, see 
https://github.com/golang/go/blob/master/src/sync/rwmutex.go
-
-// Define in bthread/mutex.cpp
+// Defined in bthread/mutex.cpp; reused here so that bthread_rwlock_t
+// participates in the global ContentionProfiler just like bthread_mutex_t
+// and bthread_sem_t.
 class ContentionProfiler;
 extern ContentionProfiler* g_cp;
 extern bvar::CollectorSpeedLimit g_cp_sl;
-extern bool is_contention_site_valid(const bthread_contention_site_t& cs);
-extern void make_contention_site_invalid(bthread_contention_site_t* cs);
 extern void submit_contention(const bthread_contention_site_t& csite, int64_t 
now_ns);
 
-// It is enough for readers. If the reader exceeds this value,
-// need to use `int64_t' instead of `int'.
-const int RWLockMaxReaders = 1 << 30;
-
-// For reading.
-static int rwlock_rdlock_impl(bthread_rwlock_t* __restrict rwlock,
-                              const struct timespec* __restrict abstime) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(1, butil::memory_order_acquire) + 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        CHECK_LT(reader_count, RWLockMaxReaders);
-        return 0;
-    }
-
-    // Slow path.
+// Lazily arm sampling on first contention. Caller must declare
+// `size_t sampling_range' and `int64_t start_ns' in scope:
+//   start_ns ==  0 -> not yet decided
+//   start_ns == -1 -> decided NOT to sample (profiler off / not selected)
+//   start_ns  >  0 -> sampling armed; value is the wall-clock start time
+#define BTHREAD_RWLOCK_MAYBE_START_SAMPLING                                    
   \
+    do {                                                                       
   \
+        if (start_ns == 0) {                                                   
   \
+            if (BAIDU_UNLIKELY(g_cp != NULL)) {                                
   \
+                sampling_range = bvar::is_collectable(&g_cp_sl);               
   \
+                start_ns = bvar::is_sampling_range_valid(sampling_range) ?     
   \
+                    butil::cpuwide_time_ns() : -1;                             
   \
+            } else {                                                           
   \
+                start_ns = -1;                                                 
   \
+            }                                                                  
   \
+        }                                                                      
   \
+    } while (0)
 
-    // Don't sample when contention profiler is off.
-    if (NULL == bthread::g_cp) {
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    }
-    // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!bvar::is_sampling_range_valid(sampling_range)) { // Don't sample.
-        return bthread_sem_timedwait(&rwlock->reader_sema, abstime);
+// Submit one contention sample if sampling was armed for this attempt.
+// `start_ns > 0' is the convention used everywhere in this file to indicate
+// that BTHREAD_RWLOCK_MAYBE_START_SAMPLING actually decided to sample.
+// No-op otherwise. Force-inlined so the uncontended fast path stays cheap.
+static BUTIL_FORCE_INLINE void submit_contention_if_sampled(
+        int64_t start_ns, size_t sampling_range) {
+    if (BAIDU_UNLIKELY(start_ns > 0)) {
+        const int64_t end_ns = butil::cpuwide_time_ns();
+        const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range};
+        submit_contention(csite, end_ns);
     }
-
-    // Sample.
-    const int64_t start_ns = butil::cpuwide_time_ns();
-    int rc = bthread_sem_timedwait(&rwlock->reader_sema, abstime);
-    const int64_t end_ns = butil::cpuwide_time_ns();
-    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
-    // Submit `csite' for each reader immediately after
-    // owning rdlock to avoid the contention of `csite'.
-    bthread::submit_contention(csite, end_ns);
-
-    return rc;
-}
-
-static inline int rwlock_rdlock(bthread_rwlock_t* rwlock) {
-    return rwlock_rdlock_impl(rwlock, NULL);
 }
 
-static inline int rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
-                                     const struct timespec* __restrict 
abstime) {
-    return rwlock_rdlock_impl(rwlock, abstime);
-}
+// bthread RWLock
+// writer-priority implementation overview
+// Three synchronization fields are used:
+//
+//   * `lock_word' (32-bit butex):
+//       bit 31  : 1 if the write lock is held, 0 otherwise.
+//       bit 0~30: number of readers currently holding the read lock.
+//       Mutually exclusive: when bit 31 is set, the lower 31 bits are 0.
+//
+//   * `writer_wait_count' (32-bit butex):
+//       Number of writers that have entered wrlock() but not yet finished
+//       (i.e. currently waiting for the mutex / waiting for lock_word==0 /
+//       holding the write lock). Each writer accounts for itself: it is
+//       incremented at the very beginning of wrlock() and decremented at
+//       the very end of unwrlock()/cleanup().
+//       Readers consult this field to implement writer-priority: if any
+//       writer is "in flight", new readers yield by waiting on it.
+//
+//   * `writer_queue_mutex' (bthread_mutex_t):
+//       Serializes writers so that at most one writer races for `lock_word'
+//       at any time. Other writers queue up on this mutex.
+//
+// Wakeup channels:
+//   * Readers waiting on writers   -> wait on  writer_wait_count, woken by 
unwrlock/cleanup
+//   * Writers waiting on readers   -> wait on  lock_word, woken by unrdlock
+//   * Writers waiting on writers   -> wait on  writer_queue_mutex
+
+static int rwlock_rdlock(bthread_rwlock_t* rwlock, bool try_lock,
+                         const struct timespec* abstime) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    auto writer_wait_count = 
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+
+    // Sampling state for the contention profiler (lazily armed on first
+    // contention so that the uncontended fast path stays cheap):
+    //   start_ns  == 0  -> not yet decided
+    //   start_ns  == -1 -> decided NOT to sample
+    //   start_ns  >  0  -> sampling armed; submit on exit
+    // Each reader samples independently and submits once on its own way out;
+    // we deliberately do NOT use rwlock->writer_csite here because that field
+    // is exclusively owned by the writer.
+    size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+    int64_t start_ns = 0;
+    int rc = 0;
 
-// Returns 0 if the lock was acquired, otherwise errno.
-static  inline int rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
     while (true) {
-        int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-            ->load(butil::memory_order_relaxed);
-        if (reader_count < 0) {
-            // Failed to acquire the read lock because there is a writer.
-            return EBUSY;
-        }
-        if (((butil::atomic<int>*)&rwlock->reader_count)
-                ->compare_exchange_weak(reader_count, reader_count + 1,
-                                        butil::memory_order_acquire,
-                                        butil::memory_order_relaxed)) {
-            return 0;
+        // Writer-priority: if any writer is in flight, yield to it.
+        // `relaxed' is sufficient here because:
+        //   - There is no data published via writer_wait_count;
+        //     data visibility is established via the acquire-CAS on
+        //     `lock_word' below paired with the release-CAS in unwrlock().
+        //   - butex_wait() will re-check the expected value before sleeping,
+        //     so we cannot lose a wakeup even if `w' is slightly stale.
+        unsigned w = writer_wait_count->load(butil::memory_order_relaxed);
+        if (w > 0) {
+            if (try_lock) {
+                // Don't sample tryrdlock failures: they are by design a
+                // non-blocking probe, not a contention event.
+                return EBUSY;
+            }
+            // We are about to block on writer_wait_count; arm sampling
+            // before parking so the wait time is included in the report.
+            BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+            if (butex_wait(writer_wait_count, w, abstime) < 0 &&
+                errno != EWOULDBLOCK && errno != EINTR) {
+                rc = errno;
+                break;
+            }
+            continue;
         }
-    }
-}
 
-static inline int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
-    // Fast path.
-    if (reader_count >= 0) {
-        return 0;
+        // No writer in flight: try to add ourselves to the reader count.
+        // 2^31 - 1 readers should be enough for any realistic workload.
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        if ((l >> 31) == 0) {
+            // Refuse to increment when the reader count has saturated
+            // the low 31 bits. Otherwise `l + 1' would flip bit 31 and
+            // we would corrupt lock_word into "writer held" state.
+            // POSIX-style: report EAGAIN ("max read locks exceeded").
+            if (BAIDU_UNLIKELY(l == 0x7FFFFFFFu)) {
+                LOG(ERROR) << "Too many readers on bthread_rwlock_t=" << 
rwlock;
+                rc = EAGAIN;
+                break;
+            }
+            // Acquire on success synchronizes-with the release-CAS in
+            // unwrlock(), so any data written by the previous writer is
+            // visible to us before we start reading.
+            if (lock_word->compare_exchange_weak(l, l + 1,
+                                                 butil::memory_order_acquire,
+                                                 butil::memory_order_relaxed)) 
{
+                rc = 0;
+                break;
+            }
+            // CAS failed (likely another reader bumped r): retry.
+        } else if (try_lock) {
+            // Write lock is currently held.
+            return EBUSY;
+        } else {
+            // Write lock currently held but not yet self-accounted as a
+            // pending writer (very narrow window inside wrlock). Arm
+            // sampling now so the spin/wait until writer_wait_count >= 1
+            // is also accounted for.
+            BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+        }
+        // Otherwise (write lock held but not try_lock): spin once more.
+        // The next iteration will observe writer_wait_count >= 1 (writers
+        // self-account in writer_wait_count for the entire wrlock lifetime),
+        // and we will block on it instead of busy spinning.
     }
-    // Slow path.
 
-    if (BAIDU_UNLIKELY(reader_count + 1 == 0 || reader_count + 1 == 
-RWLockMaxReaders)) {
-        CHECK(false) << "rwlock_unrdlock of unlocked rwlock";
-        return EINVAL;
-    }
+    // Submit one contention sample for this reader (success or failure).
+    submit_contention_if_sampled(start_ns, sampling_range);
+    return rc;
+}
 
-    // A writer is pending.
-    int reader_wait = ((butil::atomic<int>*)&rwlock->reader_wait)
-        ->fetch_add(-1, butil::memory_order_relaxed) - 1;
-    if (reader_wait != 0) {
+static int rwlock_unrdlock(bthread_rwlock_t* rwlock) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    while (true) {
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        // Misuse detection: the caller must currently hold a read lock.
+        // l == 0           -> no lock is held (double unlock?)
+        // (l >> 31) != 0   -> write lock is held, not read lock
+        if (l == 0 || (l >> 31) != 0) {
+            LOG(ERROR) << "Invalid unrdlock on bthread_rwlock_t=" << rwlock
+                       << ", lock_word=" << l;
+            return EINVAL;
+        }
+        // Release on success publishes any reads/writes done while holding
+        // the read lock to the next acquirer (typically a writer's
+        // acquire-CAS in wrlock()).
+        if(!(lock_word->compare_exchange_weak(l, l - 1,
+                                              butil::memory_order_release,
+                                              butil::memory_order_relaxed))) {
+            continue;
+        }
+        // We were the last reader (lock_word transitioned 1 -> 0). Wake the
+        // single writer (if any) that may be sleeping on `lock_word' inside
+        // wrlock(). At most one writer can be there because writers are
+        // serialized by writer_queue_mutex.
+        // No-op if nobody is waiting; butex_wake() short-circuits cheaply.
+        if (l == 1) {
+            butex_wake(lock_word);
+        }
         return 0;
     }
+}
 
-    // The last reader unblocks the writer.
-
-    if (NULL == bthread::g_cp) {
-        bthread_sem_post(&rwlock->writer_sema);
-        return 0;
+// Roll back the side effects of a failed wrlock attempt:
+//   - Release writer_queue_mutex if we managed to acquire it.
+//   - Decrement our share of writer_wait_count.
+//   - If we were the last in-flight writer, wake all readers that have
+//     been parked by writer-priority (w == 1 means writer_wait_count is now 
0).
+// Called on EBUSY (try_lock failed), ETIMEDOUT, EINTR-leading-to-fail.
+static BUTIL_FORCE_INLINE void rwlock_wrlock_cleanup(bthread_rwlock_t* rwlock, 
bool write_queue_locked) {
+    if (write_queue_locked) {
+        bthread_mutex_unlock(&rwlock->writer_queue_mutex);
     }
-    // Ask Collector if this (contended) locking should be sampled.
-    const size_t sampling_range = bvar::is_collectable(&bthread::g_cp_sl);
-    if (!sampling_range) { // Don't sample
-        bthread_sem_post(&rwlock->writer_sema);
-        return 0;
+    auto writer_wait_count = 
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+    // Withdraw our writer-priority "vote" so readers can make progress.
+    auto w = writer_wait_count->fetch_sub(1, butil::memory_order_relaxed);
+    // w is the value BEFORE the subtraction, so w == 1 means we were the
+    // last writer in flight; wake every reader parked on writer_wait_count.
+    if (w == 1) {
+        butex_wake_all(writer_wait_count);
     }
-
-    // Sampling.
-    const int64_t start_ns = butil::cpuwide_time_ns();
-    bthread_sem_post(&rwlock->writer_sema);
-    const int64_t end_ns = butil::cpuwide_time_ns();
-    const bthread_contention_site_t csite{end_ns - start_ns, sampling_range};
-    // Submit `csite' for each reader immediately after
-    // releasing rdlock to avoid the contention of `csite'.
-    bthread::submit_contention(csite, end_ns);
-    return 0;
 }
 
-#define DO_CSITE_IF_NEED                                                       
       \
-    do {                                                                       
       \
-        /* Don't sample when contention profiler is off. */                    
       \
-        if (NULL != bthread::g_cp) {                                           
       \
-            /* Ask Collector if this (contended) locking should be sampled. */ 
       \
-            sampling_range = bvar::is_collectable(&bthread::g_cp_sl);          
       \
-            start_ns = bvar::is_sampling_range_valid(sampling_range) ?         
       \
-                butil::cpuwide_time_ns() : -1;                                 
       \
-        } else {                                                               
       \
-            start_ns = -1;                                                     
       \
-        }                                                                      
       \
-    } while (0)
-
-#define SUBMIT_CSITE_IF_NEED                                                   
       \
-    do {                                                                       
       \
-        if (ETIMEDOUT == rc && start_ns > 0) {                                 
       \
-            /* Failed to lock due to ETIMEDOUT, submit the elapse directly. */ 
       \
-            const int64_t end_ns = butil::cpuwide_time_ns();                   
       \
-            const bthread_contention_site_t csite{end_ns - start_ns, 
sampling_range}; \
-            bthread::submit_contention(csite, end_ns);                         
       \
-        }                                                                      
       \
-    } while (0)
-
-// For writing.
-static inline int rwlock_wrlock_impl(bthread_rwlock_t* __restrict rwlock,
-                                     const struct timespec* __restrict 
abstime) {
-    // First, resolve competition with other writers.
-    int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
+static int rwlock_wrlock(bthread_rwlock_t* rwlock, bool try_lock,
+                         const struct timespec* abstime) {
+    auto writer_wait_count = 
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
+    // Step 1: announce ourselves before doing anything else, so that
+    // concurrent readers immediately observe writer-priority and back off.
+    // This MUST happen before we try to acquire writer_queue_mutex,
+    // otherwise a flood of readers could starve us indefinitely.
+    // 2^31 in-flight writers should be enough for any realistic workload.
+    writer_wait_count->fetch_add(1, butil::memory_order_relaxed);
+
+    // Sampling state for the contention profiler. Both wrlock() and
+    // unwrlock() sample independently: wrlock() submits its own wait time
+    // on the way out (success or failure); unwrlock() samples its own
+    // CAS-spin / mutex_unlock / butex_wake_all latency separately. We do
+    // NOT use rwlock->writer_csite here -- the two operations are not
+    // forced to share a single sample.
     size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
-    // -1: don't sample.
-    // 0: default value.
-    // > 0: Start time of sampling.
     int64_t start_ns = 0;
-    if (0 != rc) {
-        DO_CSITE_IF_NEED;
 
-        rc = bthread_mutex_timedlock(&rwlock->write_queue_mutex, abstime);
+    // Step 2: serialize with other writers. At most one writer holds
+    // `writer_queue_mutex' at a time and races for `lock_word'.
+    int rc = bthread_mutex_trylock(&rwlock->writer_queue_mutex);
+    if (0 != rc) {
+        if (try_lock) {
+            // Fail to acquire the wrlock. Don't sample trywrlock failures:
+            // they are by design a non-blocking probe, not a contention event.
+            rwlock_wrlock_cleanup(rwlock, false);
+            return rc;
+        }
+        // We are about to block on writer_queue_mutex; arm sampling.
+        // Note: the inner mutex itself has csite disabled (see init), so
+        // its blocking time is only counted once -- here, by the rwlock.
+        BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+        rc = bthread_mutex_timedlock(&rwlock->writer_queue_mutex, abstime);
         if (0 != rc) {
-            SUBMIT_CSITE_IF_NEED;
+            // Fail to acquire the wrlock. Submit the elapsed wait time
+            // directly (no unwrlock() will run for this writer).
+            submit_contention_if_sampled(start_ns, sampling_range);
+            rwlock_wrlock_cleanup(rwlock, false);
             return rc;
         }
     }
 
-    // Announce to readers there is a pending writer.
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)
-        ->fetch_add(-RWLockMaxReaders, butil::memory_order_release);
-    // Wait for active readers.
-    if (reader_count != 0 &&
-        ((butil::atomic<int>*)&rwlock->reader_wait)
-            ->fetch_add(reader_count) + reader_count != 0) {
-        rc = bthread_sem_trywait(&rwlock->writer_sema);
-        if (0 != rc) {
-            if (0 == start_ns) {
-                DO_CSITE_IF_NEED;
+    // Step 3: with `writer_queue_mutex' held, wait for all readers to drain
+    // and then claim the write bit of `lock_word'.
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    while (true) {
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        if (l != 0) {
+            // Readers still hold the lock. Park on `lock_word' until the last
+            // reader releases (unrdlock will butex_wake on transition 1->0).
+            if (try_lock) {
+                errno = EBUSY;
+                break;
             }
-
-            rc = bthread_sem_timedwait(&rwlock->writer_sema, abstime);
-            if (0 != rc) {
-                SUBMIT_CSITE_IF_NEED;
-                bthread_mutex_unlock(&rwlock->write_queue_mutex);
-                return rc;
+            // Arm sampling before parking so the wait-for-readers time is
+            // counted (in case the queue_mutex acquisition above was 
uncontended).
+            BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
+            // Use the freshly read `r' as expected; if lock_word changes
+            // before we sleep, butex_wait returns EWOULDBLOCK and we retry.
+            if (butex_wait(lock_word, l, abstime) < 0 &&
+                errno != EWOULDBLOCK && errno != EINTR) {
+                break;
             }
+            continue;
         }
+        // Acquire on success synchronizes-with release-CAS in
+        // unrdlock()/unwrlock(): we will see all data published by the
+        // previous reader/writer before we start writing.
+        if (lock_word->compare_exchange_weak(l, (unsigned)(1 << 31),
+                                             butil::memory_order_acquire,
+                                             butil::memory_order_relaxed)) {
+            // Submit the writer's wait sample immediately on success.
+            // unwrlock() will sample its own latency separately.
+            submit_contention_if_sampled(start_ns, sampling_range);
+            return 0;
+        }
+        // CAS may spuriously fail (weak); retry without sleeping.
     }
-    if (start_ns > 0) {
-        rwlock->writer_csite.duration_ns = butil::cpuwide_time_ns() - start_ns;
-        rwlock->writer_csite.sampling_range = sampling_range;
-    }
-    rwlock->wlock_flag = true;
-    return 0;
-}
-#undef DO_CSITE_IF_NEED
-#undef SUBMIT_CSITE_IF_NEED
 
-static inline int rwlock_wrlock(bthread_rwlock_t* rwlock) {
-    return rwlock_wrlock_impl(rwlock, NULL);
+    // Failure path: snapshot errno before cleanup, because
+    // bthread_mutex_unlock / butex_wake_all inside cleanup may invoke
+    // syscalls or yield and clobber errno on this thread.
+    int saved_errno = errno;
+    // Submit the elapsed wait directly; we never reached unwrlock().
+    submit_contention_if_sampled(start_ns, sampling_range);
+    rwlock_wrlock_cleanup(rwlock, true);
+    return saved_errno;
 }
 
-static inline int rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
-                                     const struct timespec* __restrict 
abstime) {
-    return rwlock_wrlock_impl(rwlock, abstime);
-}
-
-static inline int rwlock_trywrlock(bthread_rwlock_t* rwlock) {
-    int rc = bthread_mutex_trylock(&rwlock->write_queue_mutex);
-    if (0 != rc) {
-        return rc;
-    }
-
-    int expected = 0;
-    if (!((butil::atomic<int>*)&rwlock->reader_count)
-            ->compare_exchange_strong(expected, -RWLockMaxReaders,
-                                      butil::memory_order_acquire,
-                                      butil::memory_order_relaxed)) {
-        // Failed to acquire the write lock because there are active readers.
-        bthread_mutex_unlock(&rwlock->write_queue_mutex);
-        return EBUSY;
-    }
-    rwlock->wlock_flag = true;
-
-    return 0;
-}
+static int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    auto writer_wait_count = 
(butil::atomic<unsigned>*)rwlock->writer_wait_count;
 
-static inline void rwlock_unwrlock_slow(bthread_rwlock_t* rwlock, int 
reader_count) {
-    bthread_sem_post_n(&rwlock->reader_sema, reader_count);
-    // Allow other writers to proceed.
-    bthread_mutex_unlock(&rwlock->write_queue_mutex);
-}
-
-static inline int rwlock_unwrlock(bthread_rwlock_t* rwlock) {
-    rwlock->wlock_flag = false;
+    // Sampling state for the contention profiler. unwrlock() samples
+    // independently of wrlock(): although the release-CAS itself cannot
+    // fail due to writer-writer contention (writers are serialized by
+    // writer_queue_mutex), the body still does mutex_unlock(),
+    // butex_wake_all() and may spuriously spin on the weak CAS, all of
+    // which contribute to the critical-section tail latency.
+    size_t sampling_range = bvar::INVALID_SAMPLING_RANGE;
+    int64_t start_ns = 0;
+    BTHREAD_RWLOCK_MAYBE_START_SAMPLING;
 
-    // Announce to readers there is no active writer.
-    int reader_count = ((butil::atomic<int>*)&rwlock->reader_count)->fetch_add(
-        RWLockMaxReaders, butil::memory_order_release) + RWLockMaxReaders;
-    if (BAIDU_UNLIKELY(reader_count >= RWLockMaxReaders)) {
-        CHECK(false) << "rwlock_unwlock of unlocked rwlock";
-        return EINVAL;
-    }
+    while (true) {
+        unsigned l = lock_word->load(butil::memory_order_relaxed);
+        // Misuse detection: we must currently hold the write lock.
+        if (BAIDU_UNLIKELY(l != (unsigned)(1 << 31))) {
+            LOG(ERROR) << "Invalid unwrlock!";
+            return EINVAL;
+        }
+        // Release-CAS publishes all writes performed under the write lock
+        // to the next acquirer (a reader's acquire-CAS or another writer's
+        // acquire-CAS). The CAS itself cannot fail due to contention since
+        // writers are serialized by writer_queue_mutex; weak failure here is
+        // only a spurious CAS failure -- just retry.
+        if (!lock_word->compare_exchange_weak(l, 0,
+                                              butil::memory_order_release,
+                                              butil::memory_order_relaxed)) {
+            continue;
+        }
 
-    bool is_valid = bthread::is_contention_site_valid(rwlock->writer_csite);
-    if (BAIDU_UNLIKELY(is_valid)) {
-        bthread_contention_site_t saved_csite = rwlock->writer_csite;
-        bthread::make_contention_site_invalid(&rwlock->writer_csite);
+        // ---- Order of the next two operations is INTENTIONAL ----
+        //
+        // We deliberately:
+        //   (1) unlock writer_queue_mutex FIRST, then
+        //   (2) fetch_sub(writer_wait_count) and conditionally wake readers.
+        //
+        // Rationale (writer-priority semantics):
+        //   * Any writer queued on writer_queue_mutex has already
+        //     fetch_add'ed its share into writer_wait_count back in wrlock()
+        //     (before it even tried to lock the mutex). So when it wakes
+        //     up here and we later fetch_sub, the counter still reflects
+        //     "there is at least one more writer in flight": w_old >= 2,
+        //     which means w != 1, which means we will NOT wake readers.
+        //     Readers must keep yielding to the next writer -- exactly the
+        //     writer-priority invariant.
+        //   * Only when we are truly the last writer in flight (w_old == 1
+        //     after our fetch_sub, i.e. writer_wait_count is now 0) do we
+        //     wake_all readers parked on writer_wait_count.
+        //
+        // Subtle but harmless effect:
+        //   Between (1) and (2) there is a small window in which our
+        //   own "ghost share" is still counted in writer_wait_count even 
though
+        //   we have effectively left. New readers entering rdlock() during
+        //   this window will see writer_wait_count >= 1 and park on it; they
+        //   will be woken either by step (2) below (if no successor writer
+        //   appeared) or by the successor writer's eventual unwrlock.
+        //   No wakeup is ever lost: butex_wait re-checks the expected
+        //   value before truly sleeping, and any successor writer will
+        //   itself execute this same wake logic on its way out.
+        //
+        // Reversing the order (fetch_sub before unlock mutex) would break
+        // strict writer-priority because woken readers could grab the
+        // read lock before a successor writer queued on the mutex even
+        // gets a chance to CAS lock_word.
+        bthread_mutex_unlock(&rwlock->writer_queue_mutex);
+        unsigned w = writer_wait_count->fetch_sub(1, 
butil::memory_order_relaxed);
+        if (w == 1) {
+            butex_wake_all(writer_wait_count);
+        }
 
-        const int64_t unlock_start_ns = butil::cpuwide_time_ns();
-        rwlock_unwrlock_slow(rwlock, reader_count);
-        const int64_t unlock_end_ns = butil::cpuwide_time_ns();
-        saved_csite.duration_ns += unlock_end_ns - unlock_start_ns;
-        bthread::submit_contention(saved_csite, unlock_end_ns);
-    } else {
-        rwlock_unwrlock_slow(rwlock, reader_count);
+        // Submit our own unwrlock-side sample (CAS spin + mutex_unlock +
+        // butex_wake_all). This is independent of the wrlock-side sample.
+        submit_contention_if_sampled(start_ns, sampling_range);
+        return 0;
     }
-
-    return 0;
 }
 
-static inline int rwlock_unlock(bthread_rwlock_t* rwlock) {
-    if (rwlock->wlock_flag) {
+// Generic unlock entry that dispatches to unwrlock/unrdlock by inspecting
+// `lock_word'. This is safe ONLY because the caller must already hold one of
+// the two locks: while holding a read lock the high bit of `lock_word' cannot
+// flip on, and while holding the write lock the low bits cannot be set.
+// Therefore a relaxed load is sufficient to make the dispatch decision.
+static int rwlock_unlock(bthread_rwlock_t* rwlock) {
+    auto lock_word = (butil::atomic<unsigned>*)rwlock->lock_word;
+    unsigned r = lock_word->load(butil::memory_order_relaxed);
+    if ((r >> 31) != 0) {
         return rwlock_unwrlock(rwlock);
     } else {
         return rwlock_unrdlock(rwlock);
     }
 }
 
-} // namespace bthread
-
-__BEGIN_DECLS
-
-int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
-                        const bthread_rwlockattr_t* __restrict) {
-    int rc = bthread_sem_init(&rwlock->reader_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        return rc;
+// Deleter that turns butex_create_checked()'s raw pointer into something
+// std::unique_ptr can clean up automatically. Using RAII here lets the
+// init-error paths just `return rc' without manually unwinding partial
+// allocations; ownership is `release()'d only on the all-success path.
+struct ButexDeleter {
+    void operator()(void* butex) const {
+        if (butex != NULL) {
+            butex_destroy(butex);
+        }
     }
-    bthread_sem_disable_csite(&rwlock->reader_sema);
-    rc = bthread_sem_init(&rwlock->writer_sema, 0);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        return rc;
+};
+
+static int rwlock_init(bthread_rwlock_t* rwlock) {
+    std::unique_ptr<unsigned, ButexDeleter> writer_wait_count(
+    butex_create_checked<unsigned>());
+    if (writer_wait_count == NULL) {
+        LOG(ERROR) << "Fail to create writer_wait_count butex: out of memory";
+        return ENOMEM;
     }
-    bthread_sem_disable_csite(&rwlock->writer_sema);
-
-    rwlock->reader_count = 0;
-    rwlock->reader_wait = 0;
-    rwlock->wlock_flag = false;
+    std::unique_ptr<unsigned, ButexDeleter> 
lock_word(butex_create_checked<unsigned>());
+    if (lock_word == NULL) {
+        LOG(ERROR) << "Fail to create lock_word butex: out of memory";
+        return ENOMEM;
+    }
+    *writer_wait_count = 0;
+    *lock_word = 0;
 
     bthread_mutexattr_t attr;
     bthread_mutexattr_init(&attr);
+    BRPC_SCOPE_EXIT { bthread_mutexattr_destroy(&attr); };
+    // Disable csite on the inner queue mutex so the writer's wait time is
+    // accounted exactly once -- by the rwlock layer, not double-counted via
+    // the inner mutex.
     bthread_mutexattr_disable_csite(&attr);
-    rc = bthread_mutex_init(&rwlock->write_queue_mutex, &attr);
-    if (BAIDU_UNLIKELY(0 != rc)) {
-        bthread_sem_destroy(&rwlock->reader_sema);
-        bthread_sem_destroy(&rwlock->writer_sema);
+    const int rc = bthread_mutex_init(&rwlock->writer_queue_mutex, &attr);
+    if (rc != 0) {
+        LOG(ERROR) << "Fail to init writer_queue_mutex, rc=" << rc;
         return rc;
     }
-    bthread_mutexattr_destroy(&attr);
-
-    bthread::make_contention_site_invalid(&rwlock->writer_csite);
 
+    // All resources successfully created; transfer butex ownership to
+    // rwlock. From here on, bthread_rwlock_destroy() is responsible for
+    // releasing them.
+    rwlock->writer_wait_count = writer_wait_count.release();
+    rwlock->lock_word = lock_word.release();
     return 0;
 }
 
+static int rwlock_destroy(bthread_rwlock_t* rwlock) {
+    // Destroy the inner mutex first; bthread_mutex_init() allocates an
+    // internal butex which would otherwise leak. Pointers are nulled to
+    // surface accidental double-destroy / use-after-destroy bugs early.
+    int rc = bthread_mutex_destroy(&rwlock->writer_queue_mutex);
+    if (rc != 0) {
+        LOG(ERROR) << "Fail to destroy writer_queue_mutex, rc=" << rc;
+    }
+    if (rwlock->writer_wait_count != NULL) {
+        butex_destroy(rwlock->writer_wait_count);
+        rwlock->writer_wait_count = NULL;
+    }
+    if (rwlock->lock_word != NULL) {
+        butex_destroy(rwlock->lock_word);
+        rwlock->lock_word = NULL;
+    }
+    return rc;
+}
+
+} // namespace bthread
+
+__BEGIN_DECLS
+
+int bthread_rwlock_init(bthread_rwlock_t* __restrict rwlock,
+                        const bthread_rwlockattr_t* __restrict) {
+    return bthread::rwlock_init(rwlock);
+}
+
 int bthread_rwlock_destroy(bthread_rwlock_t* rwlock) {
-    bthread_sem_destroy(&rwlock->reader_sema);
-    bthread_sem_destroy(&rwlock->writer_sema);
-    bthread_mutex_destroy(&rwlock->write_queue_mutex);
-    return 0;
+    return bthread::rwlock_destroy(rwlock);
 }
 
 int bthread_rwlock_rdlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_rdlock(rwlock);
+    return bthread::rwlock_rdlock(rwlock, false, NULL);
 }
 
 int bthread_rwlock_tryrdlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_tryrdlock(rwlock);
+    return bthread::rwlock_rdlock(rwlock, true, NULL);
 }
 
 int bthread_rwlock_timedrdlock(bthread_rwlock_t* __restrict rwlock,
                                const struct timespec* __restrict abstime) {
-    return bthread::rwlock_timedrdlock(rwlock, abstime);
+    return bthread::rwlock_rdlock(rwlock, false, abstime);
 }
 
 int bthread_rwlock_wrlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_wrlock(rwlock);
+    return bthread::rwlock_wrlock(rwlock, false, NULL);
 }
 
 int bthread_rwlock_trywrlock(bthread_rwlock_t* rwlock) {
-    return bthread::rwlock_trywrlock(rwlock);
+    return bthread::rwlock_wrlock(rwlock, true, NULL);
 }
 
 int bthread_rwlock_timedwrlock(bthread_rwlock_t* __restrict rwlock,
                                const struct timespec* __restrict abstime) {
-    return bthread::rwlock_timedwrlock(rwlock, abstime);
+    return bthread::rwlock_wrlock(rwlock, false, abstime);
 }
 
 int bthread_rwlock_unlock(bthread_rwlock_t* rwlock) {
diff --git a/src/bthread/types.h b/src/bthread/types.h
index 86148c93..d46de1e8 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -225,16 +225,26 @@ typedef struct bthread_sem_t {
 typedef struct bthread_rwlock_t {
 #if defined(__cplusplus)
     bthread_rwlock_t()
-        : reader_count(0), reader_wait(0), wlock_flag(false), writer_csite{} {}
+        : writer_wait_count(0), lock_word(NULL) {}
     DISALLOW_COPY_AND_ASSIGN(bthread_rwlock_t);
 #endif
-    bthread_sem_t reader_sema; // Semaphore for readers to wait for completing 
writers.
-    bthread_sem_t writer_sema; // Semaphore for writers to wait for completing 
readers.
-    int reader_count; // Number of pending readers.
-    int reader_wait; // Number of departing readers.
-    bool wlock_flag; // Flag used to indicate that a write lock has been held.
-    bthread_mutex_t write_queue_mutex; // Held if there are pending writers.
-    bthread_contention_site_t writer_csite;
+    // Number of writers currently in flight (used as a butex):
+    // writers waiting on writer_queue_mutex, writers waiting for
+    // lock_word == 0, and the writer currently holding the write lock
+    // are all counted here. Each writer accounts for itself: incremented
+    // at the very beginning of wrlock() and decremented at the very end
+    // of unwrlock()/cleanup(). Readers consult this field to honor
+    // writer-priority: any non-zero value parks new readers.
+    unsigned* writer_wait_count;
+    // Serializes writers so that at most one writer at a time races for
+    // lock_word. Other writers queue up on this mutex.
+    bthread_mutex_t writer_queue_mutex;
+    // Bit-packed atomic lock word (used as a butex):
+    //   bit 31  : 1 if the write lock is held, 0 otherwise.
+    //   bit 0~30: number of readers currently holding the read lock.
+    //   0       : unlocked.
+    // The high bit and the low 31 bits are mutually exclusive.
+    unsigned* lock_word;
 } bthread_rwlock_t;
 
 typedef struct {
diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp
index 2da226cb..9a88051c 100644
--- a/test/bthread_rwlock_unittest.cpp
+++ b/test/bthread_rwlock_unittest.cpp
@@ -17,6 +17,7 @@
 
 #include <gtest/gtest.h>
 #include "gperftools_helper.h"
+#include "butil/atomicops.h"
 #include <bthread/rwlock.h>
 
 namespace {
@@ -286,6 +287,253 @@ TEST(RWLockTest, mix_thread_types) {
     ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
 }
 
+// Tests below verify the writer-priority semantics and the cleanup path
+// guarded by the design notes in bthread/rwlock.cpp.
+struct WriterPriorityArgs {
+    bthread_rwlock_t* rw;
+    butil::atomic<int>* order;
+    int my_order; // sequence number captured inside the critical section
+    int hold_us;
+};
+
+void* wp_writer_fn(void* arg) {
+    auto* a = (WriterPriorityArgs*)arg;
+    EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw));
+    a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed);
+    bthread_usleep(a->hold_us);
+    EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+    return NULL;
+}
+
+void* wp_reader_fn(void* arg) {
+    auto* a = (WriterPriorityArgs*)arg;
+    EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw));
+    a->my_order = a->order->fetch_add(1, butil::memory_order_relaxed);
+    bthread_usleep(a->hold_us);
+    EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+    return NULL;
+}
+
+// Verifies the writer-priority invariant guarded by the order
+// "unlock writer_queue_mutex BEFORE fetch_sub(writer_wait_count)" in
+// rwlock_unwrlock(): once a writer is queued, any new reader arriving
+// later MUST yield to that writer.
+TEST(RWLockTest, writer_priority) {
+    bthread_setconcurrency(8);
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    // (1) Main thread holds the read lock first.
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+
+    butil::atomic<int> order(0);
+    WriterPriorityArgs warg  {&rw, &order, -1, 5000};
+    WriterPriorityArgs r2arg {&rw, &order, -1, 0};
+
+    // (2) Start a writer; it should park inside wrlock() because the read
+    //     lock is held. Sleep long enough for it to fetch_add into
+    //     writer_wait_count and reach the butex_wait on `lock_word'.
+    bthread_t wth;
+    ASSERT_EQ(0, bthread_start_urgent(&wth, NULL, wp_writer_fn, &warg));
+    bthread_usleep(50 * 1000);
+
+    // (3) Now spawn a fresh reader. By writer-priority it MUST observe
+    //     writer_wait_count > 0 and park on it (NOT join the active read
+    //     lock).
+    bthread_t r2th;
+    ASSERT_EQ(0, bthread_start_urgent(&r2th, NULL, wp_reader_fn, &r2arg));
+    bthread_usleep(50 * 1000);
+
+    // (4) Release the original read lock. The writer should win the race
+    //     and complete BEFORE the queued reader.
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    bthread_join(wth, NULL);
+    bthread_join(r2th, NULL);
+
+    EXPECT_GE(warg.my_order, 0);
+    EXPECT_GE(r2arg.my_order, 0);
+    EXPECT_LT(warg.my_order, r2arg.my_order)
+        << "Writer-priority violated: writer entered with order="
+        << warg.my_order << " but late reader entered with order="
+        << r2arg.my_order;
+
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* wp_timed_wrlock_short(void* arg) {
+    auto* rw = (bthread_rwlock_t*)arg;
+    timespec ts = butil::milliseconds_from_now(50);
+    EXPECT_EQ(ETIMEDOUT, bthread_rwlock_timedwrlock(rw, &ts));
+    return NULL;
+}
+
+// Verifies the cleanup path of rwlock_wrlock_cleanup(): after multiple
+// writers fail with ETIMEDOUT, writer_wait_count must be back to 0 so
+// that subsequent readers are not blocked by leftover "ghost shares".
+TEST(RWLockTest, wrlock_failure_does_not_leak_writer_count) {
+    bthread_setconcurrency(8);
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    // Hold the read lock so every wrlock attempt must block on `lock_word'.
+    ASSERT_EQ(0, bthread_rwlock_rdlock(&rw));
+
+    const int N = 8;
+    bthread_t wth[N];
+    for (int i = 0; i < N; ++i) {
+        ASSERT_EQ(0, bthread_start_urgent(&wth[i], NULL, 
wp_timed_wrlock_short, &rw));
+    }
+    // Wait for all timed wrlock attempts to time out and run cleanup.
+    for (int i = 0; i < N; ++i) {
+        bthread_join(wth[i], NULL);
+    }
+
+    // Release the read lock; from this point on no writer is in flight,
+    // so a new reader MUST acquire the lock immediately.
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    timespec ts = butil::milliseconds_from_now(500);
+    butil::Timer t;
+    t.start();
+    ASSERT_EQ(0, bthread_rwlock_timedrdlock(&rw, &ts));
+    t.stop();
+    EXPECT_LT(t.m_elapsed(), 100)
+        << "Reader was blocked for " << t.m_elapsed() << "ms; "
+        << "writer_wait_count was likely leaked by the cleanup path.";
+
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+struct DataConsistencyArgs {
+    bthread_rwlock_t* rw;
+    int64_t* shared;       // protected by rw
+    int64_t local_inc;     // writer: number of increments this thread did
+    int64_t observed_max;  // reader: max value observed
+    bool is_writer;
+};
+
+void* dc_worker(void* arg) {
+    auto* a = (DataConsistencyArgs*)arg;
+    while (!g_stopped) {
+        if (a->is_writer) {
+            EXPECT_EQ(0, bthread_rwlock_wrlock(a->rw));
+            ++(*a->shared);
+            ++a->local_inc;
+            EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+        } else {
+            EXPECT_EQ(0, bthread_rwlock_rdlock(a->rw));
+            int64_t v = *a->shared;
+            if (v > a->observed_max) {
+                a->observed_max = v;
+            }
+            EXPECT_EQ(0, bthread_rwlock_unlock(a->rw));
+        }
+    }
+    return NULL;
+}
+
+// Verifies the release/acquire memory ordering pair on `lock_word'.
+// If the CAS in unwrlock()/unrdlock() weren't release-ordered, or the
+// CAS in rdlock()/wrlock() weren't acquire-ordered, writes done inside
+// the critical section could appear lost or inconsistent to other
+// threads, causing the final counter to disagree with total writer ops.
+TEST(RWLockTest, data_consistency) {
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    g_stopped = false;
+    const int W = 4;
+    const int R = 8;
+    bthread_setconcurrency(W + R + 4);
+
+    int64_t shared = 0;
+    std::vector<DataConsistencyArgs> args(W + R);
+    std::vector<bthread_t> threads(W + R);
+    for (int i = 0; i < W + R; ++i) {
+        args[i].rw = &rw;
+        args[i].shared = &shared;
+        args[i].local_inc = 0;
+        args[i].observed_max = -1;
+        args[i].is_writer = (i < W);
+        ASSERT_EQ(0, bthread_start_urgent(&threads[i], NULL, dc_worker, 
&args[i]));
+    }
+
+    bthread_usleep(500 * 1000);
+    g_stopped = true;
+
+    int64_t total_inc = 0;
+    for (int i = 0; i < W + R; ++i) {
+        bthread_join(threads[i], NULL);
+        if (args[i].is_writer) {
+            total_inc += args[i].local_inc;
+        }
+    }
+
+    // No lost updates: every writer's increment is reflected in `shared'.
+    EXPECT_EQ(total_inc, shared)
+        << "Lost updates: total writer ops=" << total_inc
+        << " but shared counter=" << shared;
+    // No reader saw a value greater than the final counter.
+    for (int i = W; i < W + R; ++i) {
+        EXPECT_LE(args[i].observed_max, shared)
+            << "Reader " << i << " observed_max=" << args[i].observed_max
+            << " > final shared=" << shared;
+    }
+
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
+void* ws_reader_loop(void* arg) {
+    auto* rw = (bthread_rwlock_t*)arg;
+    while (!g_stopped) {
+        EXPECT_EQ(0, bthread_rwlock_rdlock(rw));
+        // Hold the read lock briefly to keep the lock continuously busy.
+        bthread_usleep(100);
+        EXPECT_EQ(0, bthread_rwlock_unlock(rw));
+    }
+    return NULL;
+}
+
+// Verifies that under a continuous read load, a writer can still acquire
+// the lock in bounded time. This is the end-to-end guarantee of the
+// writer-priority strategy: any reader arriving AFTER the writer entered
+// wrlock() must yield, ensuring the writer never starves.
+TEST(RWLockTest, no_writer_starvation) {
+    bthread_rwlock_t rw;
+    ASSERT_EQ(0, bthread_rwlock_init(&rw, NULL));
+
+    g_stopped = false;
+    const int R = 16;
+    bthread_setconcurrency(R + 4);
+    bthread_t rth[R];
+    for (int i = 0; i < R; ++i) {
+        ASSERT_EQ(0, bthread_start_urgent(&rth[i], NULL, ws_reader_loop, &rw));
+    }
+
+    // Let the readers ramp up and saturate the lock.
+    bthread_usleep(50 * 1000);
+
+    // A single writer must succeed within a generous budget.
+    butil::Timer t;
+    t.start();
+    ASSERT_EQ(0, bthread_rwlock_wrlock(&rw));
+    t.stop();
+
+    EXPECT_LT(t.m_elapsed(), 1000)
+        << "Writer starved for " << t.m_elapsed() << "ms under "
+        << R << " concurrent readers; writer-priority is broken.";
+
+    ASSERT_EQ(0, bthread_rwlock_unlock(&rw));
+
+    g_stopped = true;
+    for (int i = 0; i < R; ++i) {
+        bthread_join(rth[i], NULL);
+    }
+    ASSERT_EQ(0, bthread_rwlock_destroy(&rw));
+}
+
 struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
     bthread_rwlock_t* rw;
     int64_t counter;
@@ -386,13 +634,14 @@ void PerfTest(uint32_t writer_ratio, ThreadId* /*dummy*/, 
int thread_num,
               << " writer_ratio=" << writer_ratio
               << " reader_num=" << reader_num
               << " read_count=" << read_count
-              << " read_average_time=" << (read_count == 0 ? 0 : 
read_wait_time / (double)read_count)
+              << " read_average_time=" << (read_count == 0 ? 0 : 
read_wait_time / (double)read_count) << "ns"
               << " writer_num=" << writer_num
               << " write_count=" << write_count
-              << " write_average_time=" << (write_count == 0 ? 0 : 
write_wait_time / (double)write_count);
+              << " write_average_time=" << (write_count == 0 ? 0 : 
write_wait_time / (double)write_count) << "ns";
 }
 
 TEST(RWLockTest, performance) {
+    bthread_setconcurrency(16);
     const int thread_num = 12;
     PerfTest(0, (pthread_t*)NULL, thread_num, pthread_create, pthread_join);
     PerfTest(0, (bthread_t*)NULL, thread_num, bthread_start_background, 
bthread_join);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to