This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 21a6aab1 New DoublyBufferedData for suspended bthread (#2225)
21a6aab1 is described below
commit 21a6aab1bed6f7febfabd5657ded59a9077fbae0
Author: Bright Chen <[email protected]>
AuthorDate: Sun Jun 25 14:33:16 2023 +0800
New DoublyBufferedData for suspended bthread (#2225)
* New DoublyBufferedData for bthread
* Use cond and ref array
* DoublyBufferedData allows bthread suspended
* optimize cond and var name
---
src/butil/containers/doubly_buffered_data.h | 242 ++++++++++++++-----
test/brpc_load_balancer_unittest.cpp | 350 ++++++++++++++++------------
2 files changed, 392 insertions(+), 200 deletions(-)
diff --git a/src/butil/containers/doubly_buffered_data.h
b/src/butil/containers/doubly_buffered_data.h
index c928037b..024a5ee8 100644
--- a/src/butil/containers/doubly_buffered_data.h
+++ b/src/butil/containers/doubly_buffered_data.h
@@ -31,6 +31,7 @@
#include "butil/errno.h"
#include "butil/atomicops.h"
#include "butil/unique_ptr.h"
+#include "butil/type_traits.h"
namespace butil {
@@ -40,7 +41,8 @@ namespace butil {
// modifications of data. As a side effect, this data structure can store
// a thread-local data for user.
//
-// Read(): begin with a thread-local mutex locked then read the foreground
+// --- `AllowBthreadSuspended=false' ---
+// Read(): Begin with a thread-local mutex locked then read the foreground
// instance which will not be changed before the mutex is unlocked. Since the
// mutex is only locked by Modify() with an empty critical section, the
// function is almost lock-free.
@@ -49,10 +51,38 @@ namespace butil {
// foreground and background, lock thread-local mutexes one by one to make
// sure all existing Read() finish and later Read() see new foreground,
// then modify background(foreground before flip) again.
+//
+// But, when `AllowBthreadSuspended=false', it is not allowed to suspend
bthread
+// while reading. Otherwise, it may cause deadlock.
+//
+//
+// --- `AllowBthreadSuspended=true' ---
+// It is allowed to suspend bthread while reading.
+// It is not allowed to use non-Void TLS.
+// If bthread will not be suspended while reading, it also makes Read() almost
+// lock-free by making Modify() *much* slower.
+// If bthread will be suspended while reading, there is competition among
+// bthreads using the same Wrapper.
+//
+// Read(): Begin with thread-local reference count of foreground instance
+// incremented by one which be protected by a thread-local mutex, then read
+// the foreground instance which will not be changed before its all
thread-local
+// reference count become zero. At last, after the query completes,
thread-local
+// reference count of foreground instance will be decremented by one, and if
+// it becomes zero, notifies Modify().
+//
+// Modify(): Modify background instance which is not used by any Read(), flip
+// foreground and background, lock thread-local mutexes one by one and wait
+// until thread-local reference counts which be protected by a thread-local
+// mutex become 0 to make sure all existing Read() finish and later Read()
+// see new foreground, then modify background(foreground before flip) again.
class Void { };
-template <typename T, typename TLS = Void>
+template <typename T> struct IsVoid : false_type { };
+template <> struct IsVoid<Void> : true_type { };
+
+template <typename T, typename TLS = Void, bool AllowBthreadSuspended = false>
class DoublyBufferedData {
class Wrapper;
class WrapperTLSGroup;
@@ -61,10 +91,14 @@ public:
class ScopedPtr {
friend class DoublyBufferedData;
public:
- ScopedPtr() : _data(NULL), _w(NULL) {}
+ ScopedPtr() : _data(NULL), _index(0), _w(NULL) {}
~ScopedPtr() {
if (_w) {
- _w->EndRead();
+ if (AllowBthreadSuspended) {
+ _w->EndRead(_index);
+ } else {
+ _w->EndRead();
+ }
}
}
const T* get() const { return _data; }
@@ -75,6 +109,8 @@ public:
private:
DISALLOW_COPY_AND_ASSIGN(ScopedPtr);
const T* _data;
+ // Index of foreground instance used by ScopedPtr.
+ int _index;
Wrapper* _w;
};
@@ -164,8 +200,15 @@ private:
const Arg2& _arg2;
};
- const T* UnsafeRead() const
- { return _data + _index.load(butil::memory_order_acquire); }
+ const T* UnsafeRead() const {
+ return _data + _index.load(butil::memory_order_acquire);
+ }
+
+ const T* UnsafeRead(int& index) const {
+ index = _index.load(butil::memory_order_acquire);
+ return _data + index;
+ }
+
Wrapper* AddWrapper(Wrapper*);
void RemoveWrapper(Wrapper*);
@@ -182,10 +225,10 @@ private:
std::vector<Wrapper*> _wrappers;
// Sequence access to _wrappers.
- pthread_mutex_t _wrappers_mutex;
+ pthread_mutex_t _wrappers_mutex{};
// Sequence modifications.
- pthread_mutex_t _modify_mutex;
+ pthread_mutex_t _modify_mutex{};
};
static const pthread_key_t INVALID_PTHREAD_KEY = (pthread_key_t)-1;
@@ -206,8 +249,8 @@ class DoublyBufferedDataWrapperBase<T, Void> {
// WrapperTLSGroup can store Wrapper in thread local storage.
// WrapperTLSGroup will destruct Wrapper data when thread exits,
// other times only reset Wrapper inner structure.
-template <typename T, typename TLS>
-class DoublyBufferedData<T, TLS>::WrapperTLSGroup {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSGroup {
public:
const static size_t RAW_BLOCK_SIZE = 4096;
const static size_t ELEMENTS_PER_BLOCK = (RAW_BLOCK_SIZE + sizeof(T) - 1)
/ sizeof(T);
@@ -302,34 +345,49 @@ private:
static __thread std::vector<ThreadBlock*>* _s_tls_blocks;
};
-template <typename T, typename TLS>
-pthread_mutex_t DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_mutex =
PTHREAD_MUTEX_INITIALIZER;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+pthread_mutex_t DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::WrapperTLSGroup::_s_mutex = PTHREAD_MUTEX_INITIALIZER;
-template <typename T, typename TLS>
-std::deque<typename DoublyBufferedData<T, TLS>::WrapperTLSId>*
- DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_free_ids = NULL;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+std::deque<typename DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::WrapperTLSId>*
+ DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::WrapperTLSGroup::_s_free_ids = NULL;
-template <typename T, typename TLS>
-typename DoublyBufferedData<T, TLS>::WrapperTLSId
- DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_id = 0;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::WrapperTLSId
+ DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::WrapperTLSGroup::_s_id = 0;
-template <typename T, typename TLS>
-__thread std::vector<typename DoublyBufferedData<T,
TLS>::WrapperTLSGroup::ThreadBlock*>*
- DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_tls_blocks = NULL;
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+__thread std::vector<typename DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::WrapperTLSGroup::ThreadBlock*>*
+ DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::WrapperTLSGroup::_s_tls_blocks = NULL;
-template <typename T, typename TLS>
-class DoublyBufferedData<T, TLS>::Wrapper
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+class DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper
: public DoublyBufferedDataWrapperBase<T, TLS> {
friend class DoublyBufferedData;
public:
- explicit Wrapper() : _control(NULL) {
+ explicit Wrapper()
+ : _control(NULL)
+ , _modify_wait(false) {
pthread_mutex_init(&_mutex, NULL);
+ if (AllowBthreadSuspended) {
+ pthread_cond_init(&_cond[0], NULL);
+ pthread_cond_init(&_cond[1], NULL);
+ }
}
~Wrapper() {
if (_control != NULL) {
_control->RemoveWrapper(this);
}
+
+ if (AllowBthreadSuspended) {
+ WaitReadDone(0);
+ WaitReadDone(1);
+
+ pthread_cond_destroy(&_cond[0]);
+ pthread_cond_destroy(&_cond[1]);
+ }
+
pthread_mutex_destroy(&_mutex);
}
@@ -340,23 +398,77 @@ public:
pthread_mutex_lock(&_mutex);
}
+ // For `AllowBthreadSuspended=true'.
+ inline void BeginReadRelease() {
+ pthread_mutex_unlock(&_mutex);
+ }
+
inline void EndRead() {
pthread_mutex_unlock(&_mutex);
}
+ // For `AllowBthreadSuspended=true'.
+ // Thread-local reference count which be protected by _mutex
+ // will be decremented by one.
+ inline void EndRead(int index) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ SubRef(index);
+ SignalReadCond(index);
+ }
+
inline void WaitReadDone() {
BAIDU_SCOPED_LOCK(_mutex);
}
+
+ // For `AllowBthreadSuspended=true'.
+ // Wait until all read of foreground instance done.
+ inline void WaitReadDone(int index) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ int& ref = index == 0 ? _ref[0] : _ref[1];
+ while (ref != 0) {
+ _modify_wait = true;
+ pthread_cond_wait(&_cond[index], &_mutex);
+ }
+ _modify_wait = false;
+ }
+
+ // For `AllowBthreadSuspended=true'.
+ inline void SignalReadCond(int index) {
+ if (_ref[index] == 0 && _modify_wait) {
+ pthread_cond_signal(&_cond[index]);
+ }
+ }
+
+ // For `AllowBthreadSuspended=true'.
+ void AddRef(int index) {
+ ++_ref[index];
+ }
+
+ // For `AllowBthreadSuspended=true'.
+ void SubRef(int index) {
+ --_ref[index];
+ }
private:
DoublyBufferedData* _control;
- pthread_mutex_t _mutex;
+ pthread_mutex_t _mutex{};
+ // For `AllowBthreadSuspended=true'.
+ // _cond[0] for _ref[0], _cond[1] for _ref[1]
+ pthread_cond_t _cond[2]{};
+ // For `AllowBthreadSuspended=true'.
+ // _ref[0] is reference count for _data[0],
+ // _ref[1] is reference count for _data[1].
+ int _ref[2]{0, 0};
+ // For `AllowBthreadSuspended=true'.
+ // Whether there is a Modify() waiting for _ref0/_ref1.
+ bool _modify_wait;
};
// Called when thread initializes thread-local wrapper.
-template <typename T, typename TLS>
-typename DoublyBufferedData<T, TLS>::Wrapper* DoublyBufferedData<T,
TLS>::AddWrapper(
- typename DoublyBufferedData<T, TLS>::Wrapper* w) {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper*
+DoublyBufferedData<T, TLS, AllowBthreadSuspended>::AddWrapper(
+ typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper*
w) {
if (NULL == w) {
return NULL;
}
@@ -378,9 +490,9 @@ typename DoublyBufferedData<T, TLS>::Wrapper*
DoublyBufferedData<T, TLS>::AddWra
}
// Called when thread quits.
-template <typename T, typename TLS>
-void DoublyBufferedData<T, TLS>::RemoveWrapper(
- typename DoublyBufferedData<T, TLS>::Wrapper* w) {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+void DoublyBufferedData<T, TLS, AllowBthreadSuspended>::RemoveWrapper(
+ typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Wrapper* w) {
if (NULL == w) {
return;
}
@@ -394,10 +506,13 @@ void DoublyBufferedData<T, TLS>::RemoveWrapper(
}
}
-template <typename T, typename TLS>
-DoublyBufferedData<T, TLS>::DoublyBufferedData()
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+DoublyBufferedData<T, TLS, AllowBthreadSuspended>::DoublyBufferedData()
: _index(0)
, _wrapper_key(0) {
+ static_assert(!(AllowBthreadSuspended && !IsVoid<TLS>::value),
+ "Forbidden to allow suspend bthread with non-Void TLS");
+
_wrappers.reserve(64);
pthread_mutex_init(&_modify_mutex, NULL);
pthread_mutex_init(&_wrappers_mutex, NULL);
@@ -411,8 +526,8 @@ DoublyBufferedData<T, TLS>::DoublyBufferedData()
}
}
-template <typename T, typename TLS>
-DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+DoublyBufferedData<T, TLS, AllowBthreadSuspended>::~DoublyBufferedData() {
// User is responsible for synchronizations between Read()/Modify() and
// this function.
@@ -429,23 +544,37 @@ DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
pthread_mutex_destroy(&_wrappers_mutex);
}
-template <typename T, typename TLS>
-int DoublyBufferedData<T, TLS>::Read(
- typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
+template <typename T, typename TLS, bool AllowBthreadSuspended>
+int DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Read(
+ typename DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ScopedPtr*
ptr) {
Wrapper* p = WrapperTLSGroup::get_or_create_tls_data(_wrapper_key);
Wrapper* w = AddWrapper(p);
if (BAIDU_LIKELY(w != NULL)) {
- w->BeginRead();
- ptr->_data = UnsafeRead();
- ptr->_w = w;
+ if (AllowBthreadSuspended) {
+ // Use reference count instead of mutex to indicate read of
+ // foreground instance, so during the read process, there is
+ // no need to lock mutex and bthread is allowed to be suspended.
+ w->BeginRead();
+ int index = -1;
+ ptr->_data = UnsafeRead(index);
+ ptr->_index = index;
+ w->AddRef(index);
+ ptr->_w = w;
+ w->BeginReadRelease();
+ } else {
+ w->BeginRead();
+ ptr->_data = UnsafeRead();
+ ptr->_w = w;
+ }
+
return 0;
}
return -1;
}
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
-size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn) {
// _modify_mutex sequences modifications. Using a separate mutex rather
// than _wrappers_mutex is to avoid blocking threads calling
// AddWrapper() or RemoveWrapper() too long. Most of the time,
modifications
@@ -471,7 +600,12 @@ size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
{
BAIDU_SCOPED_LOCK(_wrappers_mutex);
for (size_t i = 0; i < _wrappers.size(); ++i) {
- _wrappers[i]->WaitReadDone();
+ // Wait read of old foreground instance done.
+ if (AllowBthreadSuspended) {
+ _wrappers[i]->WaitReadDone(bg_index);
+ } else {
+ _wrappers[i]->WaitReadDone();
+ }
}
}
@@ -480,38 +614,38 @@ size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn) {
return ret2;
}
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1>
-size_t DoublyBufferedData<T, TLS>::Modify(Fn& fn, const Arg1& arg1) {
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(Fn& fn, const
Arg1& arg1) {
Closure1<Fn, Arg1> c(fn, arg1);
return Modify(c);
}
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1, typename Arg2>
-size_t DoublyBufferedData<T, TLS>::Modify(
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::Modify(
Fn& fn, const Arg1& arg1, const Arg2& arg2) {
Closure2<Fn, Arg1, Arg2> c(fn, arg1, arg2);
return Modify(c);
}
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn>
-size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn) {
+size_t DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::ModifyWithForeground(Fn& fn) {
WithFG0<Fn> c(fn, _data);
return Modify(c);
}
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1>
-size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(Fn& fn, const Arg1&
arg1) {
+size_t DoublyBufferedData<T, TLS,
AllowBthreadSuspended>::ModifyWithForeground(Fn& fn, const Arg1& arg1) {
WithFG1<Fn, Arg1> c(fn, _data, arg1);
return Modify(c);
}
-template <typename T, typename TLS>
+template <typename T, typename TLS, bool AllowBthreadSuspended>
template <typename Fn, typename Arg1, typename Arg2>
-size_t DoublyBufferedData<T, TLS>::ModifyWithForeground(
+size_t DoublyBufferedData<T, TLS, AllowBthreadSuspended>::ModifyWithForeground(
Fn& fn, const Arg1& arg1, const Arg2& arg2) {
WithFG2<Fn, Arg1, Arg2> c(fn, _data, arg1, arg2);
return Modify(c);
diff --git a/test/brpc_load_balancer_unittest.cpp
b/test/brpc_load_balancer_unittest.cpp
index 9cf6894b..019f40dd 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -20,13 +20,10 @@
// Date: Sun Jul 13 15:04:18 CST 2014
#include <sys/types.h>
-#include <sys/socket.h>
#include <map>
#include <gtest/gtest.h>
#include "bthread/bthread.h"
#include "butil/gperftools_profiler.h"
-#include "butil/time.h"
-#include "butil/fast_rand.h"
#include "butil/containers/doubly_buffered_data.h"
#include "brpc/describable.h"
#include "brpc/socket.h"
@@ -34,7 +31,6 @@
#include "brpc/global.h"
#include "brpc/details/load_balancer_with_naming.h"
#include "butil/strings/string_number_conversions.h"
-#include "brpc/excluded_servers.h"
#include "brpc/policy/weighted_round_robin_load_balancer.h"
#include "brpc/policy/round_robin_load_balancer.h"
#include "brpc/policy/weighted_randomized_load_balancer.h"
@@ -42,10 +38,8 @@
#include "brpc/policy/locality_aware_load_balancer.h"
#include "brpc/policy/consistent_hashing_load_balancer.h"
#include "brpc/policy/hasher.h"
-#include "brpc/errno.pb.h"
#include "echo.pb.h"
#include "brpc/channel.h"
-#include "brpc/controller.h"
#include "brpc/server.h"
namespace brpc {
@@ -74,17 +68,7 @@ protected:
};
};
-size_t TLS_ctor = 0;
-size_t TLS_dtor = 0;
-struct TLS {
- TLS() {
- ++TLS_ctor;
- }
- ~TLS() {
- ++TLS_dtor;
- }
-
-};
+class UserTLS {};
struct Foo {
Foo() : x(0) {}
@@ -96,36 +80,232 @@ bool AddN(Foo& f, int n) {
return true;
}
-TEST_F(LoadBalancerTest, doubly_buffered_data) {
+template <typename DBD>
+void test_doubly_buffered_data() {
// test doubly_buffered_data TLS limits
{
std::cout << "current PTHREAD_KEYS_MAX: " << PTHREAD_KEYS_MAX <<
std::endl;
- butil::DoublyBufferedData<Foo> data[PTHREAD_KEYS_MAX + 1];
- butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+ DBD data[PTHREAD_KEYS_MAX + 1];
+ typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, data[PTHREAD_KEYS_MAX].Read(&ptr));
ASSERT_EQ(0, ptr->x);
}
- butil::DoublyBufferedData<Foo> d;
+ DBD d;
{
- butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+ typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(0, ptr->x);
}
{
- butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+ typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(0, ptr->x);
}
d.Modify(AddN, 10);
{
- butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+ typename DBD::ScopedPtr ptr;
ASSERT_EQ(0, d.Read(&ptr));
ASSERT_EQ(10, ptr->x);
}
}
+TEST_F(LoadBalancerTest, doubly_buffered_data) {
+ test_doubly_buffered_data<butil::DoublyBufferedData<Foo>>();
+ test_doubly_buffered_data<butil::DoublyBufferedData<Foo, butil::Void,
false>>();
+ test_doubly_buffered_data<butil::DoublyBufferedData<Foo, UserTLS,
false>>();
+ test_doubly_buffered_data<butil::DoublyBufferedData<Foo, butil::Void,
true>>();
+}
+
+bool exitFlag = false;
+
+template <typename DBD>
+void* DBDBthread(void* arg) {
+ auto d = static_cast<DBD*>(arg);
+ while(!exitFlag){
+ typename DBD::ScopedPtr ptr;
+ d->Read(&ptr);
+
+ // If DBD is DoublyBufferedData<T, TLS, false>, may cause deadlock.
+ bthread_usleep(100 * 1000);
+ }
+
+ return NULL;
+}
+
+template <typename DBD>
+void DBDMultiBthread() {
+ DBD d;
+ d.Modify(AddN, 1);
+ {
+ typename DBD::ScopedPtr ptr;
+ ASSERT_EQ(0, d.Read(&ptr));
+ ASSERT_EQ(1, ptr->x);
+ }
+
+ bthread_t tids[10000];
+ for (size_t i = 0; i < ARRAY_SIZE(tids); ++i) {
+ ASSERT_EQ(0, bthread_start_urgent(&tids[i], NULL, DBDBthread<DBD>,
&d));
+ }
+
+ // Modify during reading.
+ int64_t start = butil::gettimeofday_ms();
+ while (butil::gettimeofday_ms() - start < 10 * 1000) {
+ d.Modify(AddN, 1);
+ typename DBD::ScopedPtr ptr;
+ d.Read(&ptr);
+ usleep(100 * 1000);
+ }
+ exitFlag = true;
+ for (size_t i = 0; i < ARRAY_SIZE(tids); ++i) {
+ ASSERT_EQ(0, bthread_join(tids[i], NULL));
+ }
+}
+
+// Deadlock, only for test.
+// TEST_F(LoadBalancerTest, doubly_buffered_data_multi_bthread) {
+// DBDMultiBthread<butil::DoublyBufferedData<Foo>>();
+// DBDMultiBthread<butil::DoublyBufferedData<Foo, butil::Void, false>>();
+// }
+
+TEST_F(LoadBalancerTest, doubly_buffered_data_bthread_multi_bthread) {
+ DBDMultiBthread<butil::DoublyBufferedData<Foo, butil::Void, true>>();
+}
+
+
+bool g_started = false;
+bool g_stopped = false;
+int g_prof_name_counter = 0;
+
+using PerfMap = std::unordered_map<int, int>;
+
+bool AddMapN(PerfMap& f, int n) {
+ ++f[n];
+ return true;
+}
+
+template<typename DBD>
+struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
+ DBD* dbd;
+ int64_t counter;
+ int64_t elapse_ns;
+ bool ready;
+
+ PerfArgs() : dbd(NULL), counter(0), elapse_ns(0), ready(false) {}
+};
+
+template<typename DBD>
+void* read_dbd(void* void_arg) {
+ auto args = (PerfArgs<DBD>*)void_arg;
+ args->ready = true;
+ butil::Timer t;
+ while (!g_stopped) {
+ if (g_started) {
+ break;
+ }
+ bthread_usleep(10);
+ }
+ t.start();
+ while (!g_stopped) {
+ {
+ typename DBD::ScopedPtr ptr;
+ args->dbd->Read(&ptr);
+ // ptr->find(1);
+ }
+ ++args->counter;
+ }
+ t.stop();
+ args->elapse_ns = t.n_elapsed();
+ return NULL;
+}
+
+template<typename DBD>
+void PerfTest(int thread_num, bool modify_during_reading) {
+ g_started = false;
+ g_stopped = false;
+ DBD dbd;
+ for (int i = 0; i < 1024; ++i) {
+ dbd.Modify(AddMapN, i);
+ }
+ pthread_t threads[thread_num];
+ std::vector<PerfArgs<DBD>> args(thread_num);
+ for (int i = 0; i < thread_num; ++i) {
+ args[i].dbd = &dbd;
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, read_dbd<DBD>,
&args[i]));
+ }
+ while (true) {
+ bool all_ready = true;
+ for (int i = 0; i < thread_num; ++i) {
+ if (!args[i].ready) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ break;
+ }
+ usleep(1000);
+ }
+ g_started = true;
+ char prof_name[32];
+ snprintf(prof_name, sizeof(prof_name), "doubly_buffered_data_%d.prof",
++g_prof_name_counter);
+ ProfilerStart(prof_name);
+ int64_t run_ms = 5 * 1000;
+ if (modify_during_reading) {
+ int64_t start = butil::gettimeofday_ms();
+ int i = 1;
+ while (butil::gettimeofday_ms() - start < run_ms) {
+ ASSERT_TRUE(dbd.Modify(AddMapN, i++));
+ usleep(1000);
+ }
+ } else {
+ usleep(run_ms * 1000);
+ }
+ ProfilerStop();
+ g_stopped = true;
+ int64_t wait_time = 0;
+ int64_t count = 0;
+ for (int i = 0; i < thread_num; ++i) {
+ pthread_join(threads[i], NULL);
+ wait_time += args[i].elapse_ns;
+ count += args[i].counter;
+ }
+ LOG(INFO) << butil::class_name<DBD>()
+ << " thread_num=" << thread_num
+ << " modify_during_reading=" << modify_during_reading
+ << " count=" << count
+ << " average_time=" << wait_time / (double)count
+ << " qps=" << (double)count / wait_time * (1000 * 1000 * 1000);
+}
+
+TEST_F(LoadBalancerTest, dbd_performance) {
+ int thread_num = 1;
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, true);
+
+ thread_num = 4;
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, true);
+
+ thread_num = 8;
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, true);
+
+ thread_num = 16;
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, false);
+ PerfTest<butil::DoublyBufferedData<PerfMap, butil::Void,
true>>(thread_num, true);
+}
+
+
typedef brpc::policy::LocalityAwareLoadBalancer LALB;
static void ValidateWeightTree(
@@ -1101,126 +1281,4 @@ TEST_F(LoadBalancerTest, la_selection_too_long) {
ASSERT_EQ(EHOSTDOWN, lb.SelectServer(in, &out));
}
-bool g_started = false;
-bool g_stopped = false;
-int g_prof_name_counter = 0;
-
-using PerfMap = std::unordered_map<int, int>;
-
-bool AddMapN(PerfMap& f, int n) {
- ++f[n];
- return true;
-}
-
-template<typename DBD>
-struct BAIDU_CACHELINE_ALIGNMENT PerfArgs {
- DBD* dbd;
- int64_t counter;
- int64_t elapse_ns;
- bool ready;
-
- PerfArgs() : dbd(NULL), counter(0), elapse_ns(0), ready(false) {}
-};
-
-template<typename DBD>
-void* read_dbd(void* void_arg) {
- auto args = (PerfArgs<DBD>*)void_arg;
- args->ready = true;
- butil::Timer t;
- while (!g_stopped) {
- if (g_started) {
- break;
- }
- bthread_usleep(10);
- }
- t.start();
- while (!g_stopped) {
- {
- typename DBD::ScopedPtr ptr;
- args->dbd->Read(&ptr);
- // ptr->find(1);
- }
- ++args->counter;
- }
- t.stop();
- args->elapse_ns = t.n_elapsed();
- return NULL;
-}
-
-template<typename DBD>
-void PerfTest(int thread_num, bool modify_during_reading) {
- g_started = false;
- g_stopped = false;
- DBD dbd;
- for (int i = 0; i < 1024; ++i) {
- dbd.Modify(AddMapN, i);
- }
- pthread_t threads[thread_num];
- std::vector<PerfArgs<DBD>> args(thread_num);
- for (int i = 0; i < thread_num; ++i) {
- args[i].dbd = &dbd;
- ASSERT_EQ(0, pthread_create(&threads[i], NULL, read_dbd<DBD>,
&args[i]));
- }
- while (true) {
- bool all_ready = true;
- for (int i = 0; i < thread_num; ++i) {
- if (!args[i].ready) {
- all_ready = false;
- break;
- }
- }
- if (all_ready) {
- break;
- }
- usleep(1000);
- }
- g_started = true;
- char prof_name[32];
- snprintf(prof_name, sizeof(prof_name), "doubly_buffered_data_%d.prof",
++g_prof_name_counter);
- ProfilerStart(prof_name);
- int64_t run_ms = 5 * 1000;
- if (modify_during_reading) {
- int64_t start = butil::gettimeofday_ms();
- int i = 1;
- while (butil::gettimeofday_ms() - start < run_ms) {
- ASSERT_TRUE(dbd.Modify(AddMapN, i++));
- usleep(1000);
- }
- } else {
- usleep(run_ms * 1000);
- }
- ProfilerStop();
- g_stopped = true;
- int64_t wait_time = 0;
- int64_t count = 0;
- for (int i = 0; i < thread_num; ++i) {
- pthread_join(threads[i], NULL);
- wait_time += args[i].elapse_ns;
- count += args[i].counter;
- }
- LOG(INFO) << " thread_num=" << thread_num
- << " modify_during_reading=" << modify_during_reading
- << " count=" << count
- << " average_time=" << wait_time / (double)count
- << " qps=" << (double)count / wait_time * (1000 * 1000 * 1000);
-}
-
-TEST_F(LoadBalancerTest, performance) {
- int thread_num = 1;
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-
- thread_num = 4;
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-
- thread_num = 8;
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-
- thread_num = 16;
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, false);
- PerfTest<butil::DoublyBufferedData<PerfMap>>(thread_num, true);
-}
-
} //namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]