This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 21a6aab1 New DoublyBufferedData for suspended bthread (#2225)
21a6aab1 is described below

commit 21a6aab1bed6f7febfabd5657ded59a9077fbae0
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 25 14:33:16 2023 +0800

    New DoublyBufferedData for suspended bthread (#2225)
    
    * New DoublyBufferedData for bthread
    
    * Use cond and ref array
    
    * DoublyBufferedData allows bthread suspended
    
    * optimize cond and var name
---
 src/butil/containers/doubly_buffered_data.h | 242 ++++++++++++++-----
 test/brpc_load_balancer_unittest.cpp        | 350 ++++++++++++++++------------
 2 files changed, 392 insertions(+), 200 deletions(-)

diff --git a/src/butil/containers/doubly_buffered_data.h 
b/src/butil/containers/doubly_buffered_data.h
index c928037b..024a5ee8 100644
--- a/src/butil/containers/doubly_buffered_data.h
+++ b/src/butil/containers/doubly_buffered_data.h
@@ -31,6 +31,7 @@
 #include "butil/errno.h"
 #include "butil/atomicops.h"
 #include "butil/unique_ptr.h"
+#include "butil/type_traits.h"
 
 namespace butil {
 
@@ -40,7 +41,8 @@ namespace butil {
 // modifications of data. As a side effect, this data structure can store
 // a thread-local data for user.
 //
-// Read(): begin with a thread-local mutex locked then read the foreground
+// --- `AllowBthreadSuspended=false' ---
+// Read(): Begin with a thread-local mutex locked then read the foreground
 // instance which will not be changed before the mutex is unlocked. Since the
 // mutex is only locked by Modify() with an empty critical section, the
 // function is almost lock-free.
@@ -49,10 +51,38 @@ namespace butil {
 // foreground and background, lock thread-local mutexes one by one to make
 // sure all existing Read() finish and later Read() see new foreground,
 // then modify background(foreground before flip) again.
+//
+// But, when `AllowBthreadSuspended=false', it is not allowed to suspend 
bthread
+// while reading. Otherwise, it may cause deadlock.
+//
+//
+// --- `AllowBthreadSuspended=true' ---
+// It is allowed to suspend bthread while reading.
+// It is not allowed to use non-Void TLS.
+// If bthread will not be suspended while reading, it also makes Read() almost
+// lock-free by making Modify() *much* slower.
+// If bthread will be suspended while reading, there is competition among
+// bthreads using the same Wrapper.
+//
+// Read(): Begin with thread-local reference count of foreground instance
+// incremented by one which be protected by a thread-local mutex, then read
+// the foreground instance which will not be changed before its all 
thread-local
+// reference count become zero. At last, after the query completes, 
thread-local
+// reference count of foreground instance will be decremented by one, and if
+// it becomes zero, notifies Modify().
+//
+// Modify(): Modify background instance which is not used by any Read(), flip
+// foreground and background, lock thread-local mutexes one by one and wait
+// until thread-local reference counts which be protected by a thread-local
+// mutex become 0 to make sure all existing Read() finish and later Read()
+// see new foreground, then modify background(foreground before flip) again.
 
 class Void { };
 
-template <typename T, typename TLS = Void>
+template <typename T> struct IsVoid : false_type { };
+template <> struct IsVoid<Void> : true_type { };
+
+template <typename T, typename TLS = Void, bool AllowBthreadSuspended = false>
 class DoublyBufferedData {
     class Wrapper;
     class WrapperTLSGroup;
@@ -61,10 +91,14 @@ public:
     class ScopedPtr {
     friend class DoublyBufferedData;
     public:
-        ScopedPtr() : _data(NULL), _w(NULL) {}
+        ScopedPtr() : _data(NULL), _index(0), _w(NULL) {}
         ~ScopedPtr() {
             if (_w) {
-                _w->EndRead();
+                if (AllowBthreadSuspended) {
+                    _w->EndRead(_index);
+                } else {
+                    _w->EndRead();
+                }
             }
         }
         const T* get() const { return _data; }
@@ -75,6 +109,8 @@ public:
     private:
         DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
         const T* _data;
+        // Index of foreground instance used by ScopedPtr.
+        int _index;
         Wrapper* _w;
     };
     
@@ -164,8 +200,15 @@ private:
         const Arg2& _arg2;
     };
 
-    const T* UnsafeRead() const
-    { return _data + _index.load(butil::memory_order_acquire); }
+    const T* UnsafeRead() const {
+        return _data + _index.load(butil::memory_order_acquire);
+    }
+
+    const T* UnsafeRead(int& index) const {
+        index = _index.load(butil::memory_order_acquire);
+        return _data + index;
+    }
+
     Wrapper* AddWrapper(Wrapper*);
     void RemoveWrapper(Wrapper*);
 
@@ -182,10 +225,10 @@ private:
     std::vector<Wrapper*> _wrappers;
 
     // Sequence access to _wrappers.
-    pthread_mutex_t _wrappers_mutex;
+    pthread_mutex_t _wrappers_mutex{};
 
     // Sequence modifications.
-    pthread_mutex_t _modify_mutex;
+    pthread_mutex_t _modify_mutex{};
 };
 
 static const pthread_key_t INVALID_PTHREAD_KEY = (pthread_key_t)-1;
@@ -206,8 +249,8 @@ class DoublyBufferedDataWrapperBase<T, Void> {
 // WrapperTLSGroup can store Wrapper in thread local storage.
 // WrapperTLSGroup will destruct Wrapper data when thread exits,
 // other times only reset Wrapper inner structure.
-template <typename T, typename TLS>
-class DoublyBufferedData<T, TLS>::WrapperTLSGroup {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup {
 public:
     const static size_t RAW_BLOCK_SIZE = 4096;
     const static size_t ELEMENTS_PER_BLOCK = (RAW_BLOCK_SIZE + sizeof(T) - 1) 
/ sizeof(T);
@@ -302,34 +345,49 @@ private:
     static __thread std::vector<ThreadBlock*>* _s_tls_blocks;
 };
 
-template <typename T, typename TLS>
-pthread_mutex_t DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_mutex = 
PTHREAD_MUTEX_INITIALIZER;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+pthread_mutex_t DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::WrapperTLSGroup::_s_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-template <typename T, typename TLS>
-std::deque<typename DoublyBufferedData<T, TLS>::WrapperTLSId>*
-        DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_free_ids = NULL;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+std::deque<typename DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::WrapperTLSId>*
+        DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::WrapperTLSGroup::_s_free_ids = NULL;
 
-template <typename T, typename TLS>
-typename DoublyBufferedData<T, TLS>::WrapperTLSId
-        DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_id = 0;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSId
+        DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::WrapperTLSGroup::_s_id = 0;
 
-template <typename T, typename TLS>
-__thread std::vector<typename DoublyBufferedData<T, 
TLS>::WrapperTLSGroup::ThreadBlock*>*
-        DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_tls_blocks = NULL;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+__thread std::vector<typename DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::WrapperTLSGroup::ThreadBlock*>*
+        DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::WrapperTLSGroup::_s_tls_blocks = NULL;
 
-template <typename T, typename TLS>
-class DoublyBufferedData<T, TLS>::Wrapper
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper
     : public DoublyBufferedDataWrapperBase<T, TLS> {
 friend class DoublyBufferedData;
 public:
-    explicit Wrapper() : _control(NULL) {
+    explicit Wrapper()
+        : _control(NULL)
+        , _modify_wait(false) {
         pthread_mutex_init(&_mutex, NULL);
+        if (AllowBthreadSuspended) {
+            pthread_cond_init(&_cond[0], NULL);
+            pthread_cond_init(&_cond[1], NULL);
+        }
     }
     
     ~Wrapper() {
         if (_control != NULL) {
             _control->RemoveWrapper(this);
         }
+
+        if (AllowBthreadSuspended) {
+            WaitReadDone(0);
+            WaitReadDone(1);
+
+            pthread_cond_destroy(&_cond[0]);
+            pthread_cond_destroy(&_cond[1]);
+        }
+
         pthread_mutex_destroy(&_mutex);
     }
 
@@ -340,23 +398,77 @@ public:
         pthread_mutex_lock(&_mutex);
     }
 
+    // For `AllowBthreadSuspended=true'.
+    inline void BeginReadRelease() {
+        pthread_mutex_unlock(&_mutex);
+    }
+
     inline void EndRead() {
         pthread_mutex_unlock(&_mutex);
     }
 
+    // For `AllowBthreadSuspended=true'.
+    // Thread-local reference count which be protected by _mutex
+    // will be decremented by one.
+    inline void EndRead(int index) {
+        BAIDU_SCOPED_LOCK(_mutex);
+        SubRef(index);
+        SignalReadCond(index);
+    }
+
     inline void WaitReadDone() {
         BAIDU_SCOPED_LOCK(_mutex);
     }
+
+    // For `AllowBthreadSuspended=true'.
+    // Wait until all read of foreground instance done.
+    inline void WaitReadDone(int index) {
+        BAIDU_SCOPED_LOCK(_mutex);
+        int& ref = index == 0 ? _ref[0] : _ref[1];
+        while (ref != 0) {
+            _modify_wait = true;
+            pthread_cond_wait(&_cond[index], &_mutex);
+        }
+        _modify_wait = false;
+    }
+
+    // For `AllowBthreadSuspended=true'.
+    inline void SignalReadCond(int index) {
+        if (_ref[index] == 0 && _modify_wait) {
+            pthread_cond_signal(&_cond[index]);
+        }
+    }
+
+    // For `AllowBthreadSuspended=true'.
+    void AddRef(int index) {
+        ++_ref[index];
+    }
+
+    // For `AllowBthreadSuspended=true'.
+    void SubRef(int index) {
+        --_ref[index];
+    }
     
 private:
     DoublyBufferedData* _control;
-    pthread_mutex_t _mutex;
+    pthread_mutex_t _mutex{};
+    // For `AllowBthreadSuspended=true'.
+    // _cond[0] for _ref[0], _cond[1] for _ref[1]
+    pthread_cond_t _cond[2]{};
+    // For `AllowBthreadSuspended=true'.
+    // _ref[0] is reference count for _data[0],
+    // _ref[1] is reference count for _data[1].
+    int _ref[2]{0, 0};
+    // For `AllowBthreadSuspended=true'.
+    // Whether there is a Modify() waiting for _ref0/_ref1.
+    bool _modify_wait;
 };
 
 // Called when thread initializes thread-local wrapper.
-template <typename T, typename TLS>
-typename DoublyBufferedData<T, TLS>::Wrapper* DoublyBufferedData<T, 
TLS>::AddWrapper(
-        typename DoublyBufferedData<T, TLS>::Wrapper* w) {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper*
+DoublyBufferedData<T, TLS, AllowBthreadSuspended>::AddWrapper(
+        typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper* 
w) {
     if (NULL == w) {
         return NULL;
     }
@@ -378,9 +490,9 @@ typename DoublyBufferedData<T, TLS>::Wrapper* 
DoublyBufferedData<T, TLS>::AddWra
 }
 
 // Called when thread quits.
-template <typename T, typename TLS>
-void DoublyBufferedData<T, TLS>::RemoveWrapper(
-    typename DoublyBufferedData<T, TLS>::Wrapper* w) {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+void DoublyBufferedData<T, TLS, AllowBthreadSuspended>::RemoveWrapper(
+    typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper* w) {
     if (NULL == w) {
         return;
     }
@@ -394,10 +506,13 @@ void DoublyBufferedData<T, TLS>::RemoveWrapper(
     }
 }
 
-template <typename T, typename TLS>
-DoublyBufferedData<T, TLS>::DoublyBufferedData()
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+DoublyBufferedData<T, TLS, AllowBthreadSuspended>::DoublyBufferedData()
     : _index(0)
     , _wrapper_key(0) {
+    static_assert(!(AllowBthreadSuspended && !IsVoid<TLS>::value),
+                  "Forbidden to allow suspend bthread with non-Void TLS");
+
     _wrappers.reserve(64);
     pthread_mutex_init(&_modify_mutex, NULL);
     pthread_mutex_init(&_wrappers_mutex, NULL);
@@ -411,8 +526,8 @@ DoublyBufferedData<T, TLS>::DoublyBufferedData()
     }
 }
 
-template <typename T, typename TLS>
-DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+DoublyBufferedData<T, TLS, AllowBthreadSuspended>::~DoublyBufferedData() {
     // User is responsible for synchronizations between Read()/Modify() and
     // this function.
     
@@ -429,23 +544,37 @@ DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
     pthread_mutex_destroy(&_wrappers_mutex);
 }
 
-template <typename T, typename TLS>
-int DoublyBufferedData<T, TLS>::Read(
-    typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(
+    typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ScopedPtr* 
ptr) {
     Wrapper* p = WrapperTLSGroup::get_or_create_tls_data(_wrapper_key);
     Wrapper* w = AddWrapper(p);
     if (BAIDU_LIKELY(w != NULL)) {
-        w->BeginRead();
-        ptr->_data = UnsafeRead();
-        ptr->_w = w;
+        if (AllowBthreadSuspended) {
+            // Use reference count instead of mutex to indicate read of
+            // foreground instance, so during the read process, there is
+            // no need to lock mutex and bthread is allowed to be suspended.
+            w->BeginRead();
+            int index = -1;
+            ptr->_data = UnsafeRead(index);
+            ptr->_index = index;
+            w->AddRef(index);
+            ptr->_w = w;
+            w->BeginReadRelease();
+        } else {
+            w->BeginRead();
+            ptr->_data = UnsafeRead();
+            ptr->_w = w;
+        }
+
         return 0;
     }
     return -1;
 }
 
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
 template <typename Fn>
-size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
     // _modify_mutex sequences modifications. Using a separate mutex rather
     // than _wrappers_mutex is to avoid blocking threads calling
     // AddWrapper() or RemoveWrapper() too long. Most of the time, 
modifications
@@ -471,7 +600,12 @@ size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
     {
         BAIDU_SCOPED_LOCK(_wrappers_mutex);
         for (size_t i = 0; i < _wrappers.size(); ++i) {
-            _wrappers[i]->WaitReadDone();
+            // Wait read of old foreground instance done.
+            if (AllowBthreadSuspended) {
+                _wrappers[i]->WaitReadDone(bg_index);
+            } else {
+                _wrappers[i]->WaitReadDone();
+            }
         }
     }
 
@@ -480,38 +614,38 @@ size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
     return ret2;
 }
 
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
 template <typename Fn, typename Arg1>
-size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn, const Arg1& arg1) {
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn, const 
Arg1& arg1) {
     Closure1<Fn, Arg1> c(fn, arg1);
     return Modify(c);
 }
 
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
 template <typename Fn, typename Arg1, typename Arg2>
-size_t DoublyBufferedData<T, TLS>::Modify(
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(
     Fn& fn, const Arg1& arg1, const Arg2& arg2) {
     Closure2<Fn, Arg1, Arg2> c(fn, arg1, arg2);
     return Modify(c);
 }
 
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
 template <typename Fn>
-size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn) {
+size_t DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::ModifyWithForeground(Fn& fn) {
     WithFG0<Fn> c(fn, _data);
     return Modify(c);
 }
 
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
 template <typename Fn, typename Arg1>
-size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn, const Arg1& 
arg1) {
+size_t DoublyBufferedData<T, TLS, 
AllowBthreadSuspended>::ModifyWithForeground(Fn& fn, const Arg1& arg1) {
     WithFG1<Fn, Arg1> c(fn, _data, arg1);
     return Modify(c);
 }
 
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
 template <typename Fn, typename Arg1, typename Arg2>
-size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(
     Fn& fn, const Arg1& arg1, const Arg2& arg2) {
     WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
     return Modify(c);
diff --git a/test/brpc_load_balancer_unittest.cpp 
b/test/brpc_load_balancer_unittest.cpp
index 9cf6894b..019f40dd 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -20,13 +20,10 @@
 // Date: Sun Jul 13 15:04:18 CST 2014
 
 #include <sys/types.h>
-#include <sys/socket.h>
 #include <map>
 #include <gtest/gtest.h>
 #include "bthread/bthread.h"
 #include "butil/gperftools_profiler.h"
-#include "butil/time.h"
-#include "butil/fast_rand.h"
 #include "butil/containers/doubly_buffered_data.h"
 #include "brpc/describable.h"
 #include "brpc/socket.h"
@@ -34,7 +31,6 @@
 #include "brpc/global.h"
 #include "brpc/details/load_balancer_with_naming.h"
 #include "butil/strings/string_number_conversions.h"
-#include "brpc/excluded_servers.h" 
 #include "brpc/policy/weighted_round_robin_load_balancer.h"
 #include "brpc/policy/round_robin_load_balancer.h"
 #include "brpc/policy/weighted_randomized_load_balancer.h"
@@ -42,10 +38,8 @@
 #include "brpc/policy/locality_aware_load_balancer.h"
 #include "brpc/policy/consistent_hashing_load_balancer.h"
 #include "brpc/policy/hasher.h"
-#include "brpc/errno.pb.h"
 #include "echo.pb.h"
 #include "brpc/channel.h"
-#include "brpc/controller.h"
 #include "brpc/server.h"
 
 namespace brpc {
@@ -74,17 +68,7 @@ protected:
     };
 };
 
-size_t TLS_ctor = 0;
-size_t TLS_dtor = 0;
-struct TLS {
-    TLS() {
-        ++TLS_ctor;
-    }
-    ~TLS() {
-        ++TLS_dtor;
-    }
-
-};
+class UserTLS {};
 
 struct Foo {
     Foo() : x(0) {}
@@ -96,36 +80,232 @@ bool AddN(Foo& f, int n) {
     return true;
 }
 
-TEST_F(LoadBalancerTest, doubly_buffered_data) {
+template <typename DBD>
+void test_doubly_buffered_data() {
     // test doubly_buffered_data TLS limits
     {
         std::cout << "current PTHREAD_KEYS_MAX: " << PTHREAD_KEYS_MAX << 
std::endl;
-        butil::DoublyBufferedData<Foo> data[PTHREAD_KEYS_MAX + 1];
-        butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+        DBD data[PTHREAD_KEYS_MAX + 1];
+        typename DBD::ScopedPtr ptr;
         ASSERT_EQ(0, data[PTHREAD_KEYS_MAX].Read(&ptr));
         ASSERT_EQ(0, ptr->x);
     }
 
-    butil::DoublyBufferedData<Foo> d;
+    DBD d;
     {
-        butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+        typename DBD::ScopedPtr ptr;
         ASSERT_EQ(0, d.Read(&ptr));
         ASSERT_EQ(0, ptr->x);
     }
     {
-        butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+        typename DBD::ScopedPtr ptr;
         ASSERT_EQ(0, d.Read(&ptr));
         ASSERT_EQ(0, ptr->x);
     }
 
     d.Modify(AddN, 10);
     {
-        butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+        typename DBD::ScopedPtr ptr;
         ASSERT_EQ(0, d.Read(&ptr));
         ASSERT_EQ(10, ptr->x);
     }
 }
 
+TEST_F(LoadBalancerTest, doubly_buffered_data) {
+    test_doubly_buffered_data<butil::DoublyBufferedData<Foo>>();
+    test_doubly_buffered_data<butil::DoublyBufferedData<Foo, butil::Void, 
false>>();
+    test_doubly_buffered_data<butil::DoublyBufferedData<Foo, UserTLS, 
false>>();
+    test_doubly_buffered_data<butil::DoublyBufferedData<Foo, butil::Void, 
true>>();
+}
+
+bool exitFlag = false;
+
+template <typename DBD>
+void* DBDBthread(void* arg) {
+    auto d = static_cast<DBD*>(arg);
+    while(!exitFlag){
+        typename DBD::ScopedPtr ptr;
+        d->Read(&ptr);
+
+        // If DBD is DoublyBufferedData<T, TLS, false>, may cause deadlock.
+        bthread_usleep(100 * 1000);
+    }
+
+    return NULL;
+}
+
+template <typename DBD>
+void DBDMultiBthread() {
+    DBD d;
+    d.Modify(AddN, 1);
+    {
+        typename DBD::ScopedPtr ptr;
+        ASSERT_EQ(0, d.Read(&ptr));
+        ASSERT_EQ(1, ptr->x);
+    }
+
+    bthread_t tids[10000];
+    for (size_t i = 0; i < ARRAY_SIZE(tids); ++i) {
+        ASSERT_EQ(0, bthread_start_urgent(&tids[i], NULL, DBDBthread<DBD>, 
&d));
+    }
+
+    // Modify during reading.
+    int64_t start = butil::gettimeofday_ms();
+    while (butil::gettimeofday_ms() - start < 10 * 1000) {
+        d.Modify(AddN, 1);
+        typename DBD::ScopedPtr ptr;
+        d.Read(&ptr);
+        usleep(100 * 1000);
+    }
+    exitFlag = true;
+    for (size_t i = 0; i < ARRAY_SIZE(tids); ++i) {
+        ASSERT_EQ(0, bthread_join(tids[i], NULL));
+    }
+}
+
+// Deadlock, only for test.
+// TEST_F(LoadBalancerTest, doubly_buffered_data_multi_bthread) {
+//     DBDMultiBthread<butil::DoublyBufferedData<Foo>>();
+//     DBDMultiBthread<butil::DoublyBufferedData<Foo, butil::Void, false>>();
+// }
+
+TEST_F(LoadBalancerTest, doubly_buffered_data_bthread_multi_bthread) {
+    DBDMultiBthread<butil::DoublyBufferedData<Foo, butil::Void, true>>();
+}
+
+
+bool g_started = false;
+bool g_stopped = false;
+int g_prof_name_counter = 0;
+
+using PerfMap = std::unordered_map<int, int>;
+
+bool AddMapN(PerfMap& f, int n) {
+    ++f[n];
+    return true;
+}
+
+template<typename DBD>
+struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
+    DBD* dbd;
+    int64_t counter;
+    int64_t elapse_ns;
+    bool ready;
+
+    PerfArgs() : dbd(NULL), counter(0), elapse_ns(0), ready(false) {}
+};
+
+template<typename DBD>
+void* read_dbd(void* void_arg) {
+    auto args = (PerfArgs<DBD>*)void_arg;
+    args->ready = true;
+    butil::Timer t;
+    while (!g_stopped) {
+        if (g_started) {
+            break;
+        }
+        bthread_usleep(10);
+    }
+    t.start();
+    while (!g_stopped) {
+        {
+            typename DBD::ScopedPtr ptr;
+            args->dbd->Read(&ptr);
+            // ptr->find(1);
+        }
+        ++args->counter;
+    }
+    t.stop();
+    args->elapse_ns = t.n_elapsed();
+    return NULL;
+}
+
+template<typename DBD>
+void PerfTest(int thread_num, bool modify_during_reading) {
+    g_started = false;
+    g_stopped = false;
+    DBD dbd;
+    for (int i = 0; i < 1024; ++i) {
+        dbd.Modify(AddMapN, i);
+    }
+    pthread_t threads[thread_num];
+    std::vector<PerfArgs<DBD>> args(thread_num);
+    for (int i = 0; i < thread_num; ++i) {
+        args[i].dbd = &dbd;
+        ASSERT_EQ(0, pthread_create(&threads[i], NULL, read_dbd<DBD>, 
&args[i]));
+    }
+    while (true) {
+        bool all_ready = true;
+        for (int i = 0; i < thread_num; ++i) {
+            if (!args[i].ready) {
+                all_ready = false;
+                break;
+            }
+        }
+        if (all_ready) {
+            break;
+        }
+        usleep(1000);
+    }
+    g_started = true;
+    char prof_name[32];
+    snprintf(prof_name, sizeof(prof_name), "doubly_buffered_data_%d.prof", 
++g_prof_name_counter);
+    ProfilerStart(prof_name);
+    int64_t run_ms = 5 * 1000;
+    if (modify_during_reading) {
+        int64_t start = butil::gettimeofday_ms();
+        int i = 1;
+        while (butil::gettimeofday_ms() - start < run_ms) {
+            ASSERT_TRUE(dbd.Modify(AddMapN, i++));
+            usleep(1000);
+        }
+    } else {
+        usleep(run_ms * 1000);
+    }
+    ProfilerStop();
+    g_stopped = true;
+    int64_t wait_time = 0;
+    int64_t count = 0;
+    for (int i = 0; i < thread_num; ++i) {
+        pthread_join(threads[i], NULL);
+        wait_time += args[i].elapse_ns;
+        count += args[i].counter;
+    }
+    LOG(INFO) << butil::class_name<DBD>()
+              << " thread_num=" << thread_num
+              << " modify_during_reading=" << modify_during_reading
+              << " count=" << count
+              << " average_time=" << wait_time / (double)count
+              << " qps=" << (double)count / wait_time * (1000 * 1000 * 1000);
+}
+
+TEST_F(LoadBalancerTest, dbd_performance) {
+    int thread_num = 1;
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, true);
+
+    thread_num = 4;
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, true);
+
+    thread_num = 8;
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, true);
+
+    thread_num = 16;
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, false);
+    PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void, 
true>>(thread_num, true);
+}
+
+
 typedef brpc::policy::LocalityAwareLoadBalancer LALB;
 
 static void ValidateWeightTree(
@@ -1101,126 +1281,4 @@ TEST_F(LoadBalancerTest, la_selection_too_long) {
     ASSERT_EQ(EHOSTDOWN, lb.SelectServer(in, &out));
 }
 
-bool g_started = false;
-bool g_stopped = false;
-int g_prof_name_counter = 0;
-
-using PerfMap = std::unordered_map<int, int>;
-
-bool AddMapN(PerfMap& f, int n) {
-    ++f[n];
-    return true;
-}
-
-template<typename DBD>
-struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
-    DBD* dbd;
-    int64_t counter;
-    int64_t elapse_ns;
-    bool ready;
-
-    PerfArgs() : dbd(NULL), counter(0), elapse_ns(0), ready(false) {}
-};
-
-template<typename DBD>
-void* read_dbd(void* void_arg) {
-    auto args = (PerfArgs<DBD>*)void_arg;
-    args->ready = true;
-    butil::Timer t;
-    while (!g_stopped) {
-        if (g_started) {
-            break;
-        }
-        bthread_usleep(10);
-    }
-    t.start();
-    while (!g_stopped) {
-        {
-            typename DBD::ScopedPtr ptr;
-            args->dbd->Read(&ptr);
-            // ptr->find(1);
-        }
-        ++args->counter;
-    }
-    t.stop();
-    args->elapse_ns = t.n_elapsed();
-    return NULL;
-}
-
-template<typename DBD>
-void PerfTest(int thread_num, bool modify_during_reading) {
-    g_started = false;
-    g_stopped = false;
-    DBD dbd;
-    for (int i = 0; i < 1024; ++i) {
-        dbd.Modify(AddMapN, i);
-    }
-    pthread_t threads[thread_num];
-    std::vector<PerfArgs<DBD>> args(thread_num);
-    for (int i = 0; i < thread_num; ++i) {
-        args[i].dbd = &dbd;
-        ASSERT_EQ(0, pthread_create(&threads[i], NULL, read_dbd<DBD>, 
&args[i]));
-    }
-    while (true) {
-        bool all_ready = true;
-        for (int i = 0; i < thread_num; ++i) {
-            if (!args[i].ready) {
-                all_ready = false;
-                break;
-            }
-        }
-        if (all_ready) {
-            break;
-        }
-        usleep(1000);
-    }
-    g_started = true;
-    char prof_name[32];
-    snprintf(prof_name, sizeof(prof_name), "doubly_buffered_data_%d.prof", 
++g_prof_name_counter);
-    ProfilerStart(prof_name);
-    int64_t run_ms = 5 * 1000;
-    if (modify_during_reading) {
-        int64_t start = butil::gettimeofday_ms();
-        int i = 1;
-        while (butil::gettimeofday_ms() - start < run_ms) {
-            ASSERT_TRUE(dbd.Modify(AddMapN, i++));
-            usleep(1000);
-        }
-    } else {
-        usleep(run_ms * 1000);
-    }
-    ProfilerStop();
-    g_stopped = true;
-    int64_t wait_time = 0;
-    int64_t count = 0;
-    for (int i = 0; i < thread_num; ++i) {
-        pthread_join(threads[i], NULL);
-        wait_time += args[i].elapse_ns;
-        count += args[i].counter;
-    }
-    LOG(INFO) << " thread_num=" << thread_num
-              << " modify_during_reading=" << modify_during_reading
-              << " count=" << count
-              << " average_time=" << wait_time / (double)count
-              << " qps=" << (double)count / wait_time * (1000 * 1000 * 1000);
-}
-
-TEST_F(LoadBalancerTest, performance) {
-    int thread_num = 1;
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-
-    thread_num = 4;
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-
-    thread_num = 8;
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-
-    thread_num = 16;
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
-    PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-}
-
 } //namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to