This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 25cfdb08 Thread local without limit by _SC_THREAD_KEYS_MAX (#2296)
25cfdb08 is described below
commit 25cfdb08300f367334da3b52a235a6fb3362c6cf
Author: Bright Chen <[email protected]>
AuthorDate: Fri Oct 13 10:20:41 2023 +0800
Thread local without limit by _SC_THREAD_KEYS_MAX (#2296)
* Thread local without num limit
* Thread local without num limit
* Thread local without num limit
* Opt rename
---
BUILD.bazel | 1 +
CMakeLists.txt | 1 +
Makefile | 1 +
src/butil/thread_key.cpp | 194 ++++++++++++++++++
src/butil/thread_key.h | 202 +++++++++++++++++++
test/BUILD.bazel | 1 +
test/CMakeLists.txt | 1 +
test/Makefile | 1 +
test/thread_key_unittest.cpp | 460 +++++++++++++++++++++++++++++++++++++++++++
9 files changed, 862 insertions(+)
diff --git a/BUILD.bazel b/BUILD.bazel
index 8848593a..5d317c90 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -200,6 +200,7 @@ BUTIL_SRCS = [
"src/butil/status.cpp",
"src/butil/string_printf.cpp",
"src/butil/thread_local.cpp",
+ "src/butil/thread_key.cpp",
"src/butil/unix_socket.cpp",
"src/butil/endpoint.cpp",
"src/butil/fd_utility.cpp",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 66cc0157..a01a0bf6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -377,6 +377,7 @@ set(BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/src/butil/status.cpp
${PROJECT_SOURCE_DIR}/src/butil/string_printf.cpp
${PROJECT_SOURCE_DIR}/src/butil/thread_local.cpp
+ ${PROJECT_SOURCE_DIR}/src/butil/thread_key.cpp
${PROJECT_SOURCE_DIR}/src/butil/unix_socket.cpp
${PROJECT_SOURCE_DIR}/src/butil/endpoint.cpp
${PROJECT_SOURCE_DIR}/src/butil/fd_utility.cpp
diff --git a/Makefile b/Makefile
index 574c63bb..87ddc5a4 100644
--- a/Makefile
+++ b/Makefile
@@ -151,6 +151,7 @@ BUTIL_SOURCES = \
src/butil/status.cpp \
src/butil/string_printf.cpp \
src/butil/thread_local.cpp \
+ src/butil/thread_key.cpp \
src/butil/unix_socket.cpp \
src/butil/endpoint.cpp \
src/butil/fd_utility.cpp \
diff --git a/src/butil/thread_key.cpp b/src/butil/thread_key.cpp
new file mode 100644
index 00000000..02bcd586
--- /dev/null
+++ b/src/butil/thread_key.cpp
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "thread_key.h"
+#include "pthread.h"
+#include <deque>
+#include "butil/thread_local.h"
+
+namespace butil {
+
+// Check whether an entry is unused.
+#define KEY_UNUSED(p) (((p) & 1) == 0)
+
+// Check whether a key is usable. We cannot reuse an allocated key if
+// the sequence counter would overflow after the next destroy call.
+// This would mean that we potentially free memory for a key with the
+// same sequence. This is *very* unlikely to happen, A program would
+// have to create and destroy a key 2^31 times. If it should happen we
+// simply don't use this specific key anymore.
+#define KEY_USABLE(p) (((size_t) (p)) < ((size_t) ((p) + 2)))
+
+static const uint32_t THREAD_KEY_RESERVE = 8096;
+pthread_mutex_t g_thread_key_mutex = PTHREAD_MUTEX_INITIALIZER;
+static size_t g_id = 0;
+static std::deque<size_t>* g_free_ids = NULL;
+static std::vector<ThreadKeyInfo>* g_thread_keys = NULL;
+static __thread std::vector<ThreadKeyTLS>* g_tls_data = NULL;
+
+ThreadKey& ThreadKey::operator=(ThreadKey&& other) noexcept {
+ if (this == &other) {
+ return *this;
+ }
+
+ _id = other._id;
+ _seq = other._seq;
+ other.Reset();
+ return *this;
+}
+
+bool ThreadKey::Valid() const {
+ return _id != InvalidID && !KEY_UNUSED(_seq);
+}
+
+static void DestroyTlsData() {
+ if (!g_tls_data) {
+ return;
+ }
+ std::vector<ThreadKeyInfo> dummy_keys;
+ {
+ BAIDU_SCOPED_LOCK(g_thread_key_mutex);
+ if (BAIDU_LIKELY(g_thread_keys)) {
+ dummy_keys.insert(dummy_keys.end(), g_thread_keys->begin(),
g_thread_keys->end());
+ }
+ }
+ for (size_t i = 0; i < g_tls_data->size(); ++i) {
+ if (!KEY_UNUSED(dummy_keys[i].seq) && dummy_keys[i].dtor) {
+ dummy_keys[i].dtor((*g_tls_data)[i].data);
+ }
+ }
+ delete g_tls_data;
+ g_tls_data = NULL;
+}
+
+static std::deque<size_t>* GetGlobalFreeIds() {
+ if (BAIDU_UNLIKELY(!g_free_ids)) {
+ g_free_ids = new (std::nothrow) std::deque<size_t>();
+ if (BAIDU_UNLIKELY(!g_free_ids)) {
+ abort();
+ }
+ }
+
+ return g_free_ids;
+}
+
+int thread_key_create(ThreadKey& thread_key, DtorFunction dtor) {
+ BAIDU_SCOPED_LOCK(g_thread_key_mutex);
+ size_t id;
+ auto free_ids = GetGlobalFreeIds();
+ if (!free_ids) {
+ return ENOMEM;
+ }
+
+ if (!free_ids->empty()) {
+ id = free_ids->back();
+ free_ids->pop_back();
+ } else {
+ if (g_id >= ThreadKey::InvalidID) {
+ // No more available ids.
+ return EAGAIN;
+ }
+ id = g_id++;
+ if(BAIDU_UNLIKELY(!g_thread_keys)) {
+ g_thread_keys = new (std::nothrow) std::vector<ThreadKeyInfo>;
+ if(BAIDU_UNLIKELY(!g_thread_keys)) {
+ return ENOMEM;
+ }
+ g_thread_keys->reserve(THREAD_KEY_RESERVE);
+ }
+ g_thread_keys->resize(id + 1);
+ }
+
+ ++((*g_thread_keys)[id].seq);
+ (*g_thread_keys)[id].dtor = dtor;
+ thread_key._id = id;
+ thread_key._seq = (*g_thread_keys)[id].seq;
+
+ return 0;
+}
+
+int thread_key_delete(ThreadKey& thread_key) {
+ if (BAIDU_UNLIKELY(!thread_key.Valid())) {
+ return EINVAL;
+ }
+
+ BAIDU_SCOPED_LOCK(g_thread_key_mutex);
+ size_t id = thread_key._id;
+ size_t seq = thread_key._seq;
+ if (id >= g_thread_keys->size() ||
+ seq != (*g_thread_keys)[id].seq ||
+ KEY_UNUSED((*g_thread_keys)[id].seq)) {
+ thread_key.Reset();
+ return EINVAL;
+ }
+
+ if (BAIDU_UNLIKELY(!GetGlobalFreeIds())) {
+ return ENOMEM;
+ }
+
+ ++((*g_thread_keys)[id].seq);
+ // Collect the usable key id for reuse.
+ if (KEY_USABLE((*g_thread_keys)[id].seq)) {
+ GetGlobalFreeIds()->push_back(id);
+ }
+ thread_key.Reset();
+
+ return 0;
+}
+
+int thread_setspecific(ThreadKey& thread_key, void* data) {
+ if (BAIDU_UNLIKELY(!thread_key.Valid())) {
+ return EINVAL;
+ }
+ size_t id = thread_key._id;
+ size_t seq = thread_key._seq;
+ if (BAIDU_UNLIKELY(!g_tls_data)) {
+ g_tls_data = new (std::nothrow) std::vector<ThreadKeyTLS>;
+ if (BAIDU_UNLIKELY(!g_tls_data)) {
+ return ENOMEM;
+ }
+ g_tls_data->reserve(THREAD_KEY_RESERVE);
+ // Register the destructor of tls_data in this thread.
+ butil::thread_atexit(DestroyTlsData);
+ }
+
+ if (id >= g_tls_data->size()) {
+ g_tls_data->resize(id + 1);
+ }
+
+ (*g_tls_data)[id].seq = seq;
+ (*g_tls_data)[id].data = data;
+
+ return 0;
+}
+
+void* thread_getspecific(ThreadKey& thread_key) {
+ if (BAIDU_UNLIKELY(!thread_key.Valid())) {
+ return NULL;
+ }
+ size_t id = thread_key._id;
+ size_t seq = thread_key._seq;
+ if (BAIDU_UNLIKELY(!g_tls_data ||
+ id >= g_tls_data->size() ||
+ (*g_tls_data)[id].seq != seq)){
+ return NULL;
+ }
+
+ return (*g_tls_data)[id].data;
+}
+
+} // namespace butil
\ No newline at end of file
diff --git a/src/butil/thread_key.h b/src/butil/thread_key.h
new file mode 100644
index 00000000..48f02f7d
--- /dev/null
+++ b/src/butil/thread_key.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_THREAD_KEY_H
+#define BRPC_THREAD_KEY_H
+
+#include <limits>
+#include <pthread.h>
+#include <stdlib.h>
+#include <vector>
+#include "butil/scoped_lock.h"
+
+namespace butil {
+
+typedef void (*DtorFunction)(void *);
+
+class ThreadKey {
+public:
+ friend int thread_key_create(ThreadKey& thread_key, DtorFunction dtor);
+ friend int thread_key_delete(ThreadKey& thread_key);
+ friend int thread_setspecific(ThreadKey& thread_key, void* data);
+ friend void* thread_getspecific(ThreadKey& thread_key);
+
+ static constexpr size_t InvalidID = std::numeric_limits<size_t>::max();
+ static constexpr size_t InitSeq = 0;
+
+ constexpr ThreadKey() :_id(InvalidID), _seq(InitSeq) {}
+
+ ~ThreadKey() {
+ Reset();
+ }
+
+ ThreadKey(ThreadKey&& other) noexcept
+ : _id(other._id)
+ , _seq(other._seq) {
+ other.Reset();
+ }
+
+ ThreadKey& operator=(ThreadKey&& other) noexcept;
+
+ ThreadKey(const ThreadKey& other) = delete;
+ ThreadKey& operator=(const ThreadKey& other) = delete;
+
+ bool Valid() const;
+
+ void Reset() {
+ _id = InvalidID;
+ _seq = InitSeq;
+ }
+
+ private:
+ size_t _id; // Key id.
+ // Sequence number form g_thread_keys set in thread_key_create.
+ size_t _seq;
+};
+
+struct ThreadKeyInfo {
+ ThreadKeyInfo() : seq(0), dtor(NULL) {}
+
+ size_t seq; // Already allocated?
+ DtorFunction dtor; // Destruction routine.
+};
+
+struct ThreadKeyTLS {
+ ThreadKeyTLS() : seq(0), data(NULL) {}
+
+ // Sequence number form ThreadKey,
+ // set in `thread_setspecific',
+ // used to check if the key is valid in `thread_getspecific'.
+ size_t seq;
+ void* data; // User data.
+};
+
+// pthread_key_xxx implication without num limit of key.
+int thread_key_create(ThreadKey& thread_key, DtorFunction dtor);
+int thread_key_delete(ThreadKey& thread_key);
+int thread_setspecific(ThreadKey& thread_key, void* data);
+void* thread_getspecific(ThreadKey& thread_key);
+
+
+template <typename T>
+class ThreadLocal {
+public:
+ ThreadLocal() : ThreadLocal(false) {}
+
+ explicit ThreadLocal(bool delete_on_thread_exit);
+
+ ~ThreadLocal();
+
+ // non-copyable
+ ThreadLocal(const ThreadLocal&) = delete;
+ ThreadLocal& operator=(const ThreadLocal&) = delete;
+
+ T* get();
+
+ T* operator->() const { return get(); }
+
+ T& operator*() const { return *get(); }
+
+ void reset(T* ptr);
+
+ void reset() {
+ reset(NULL);
+ }
+
+private:
+ static void DefaultDtor(void* ptr) {
+ if (ptr) {
+ delete static_cast<T*>(ptr);
+ }
+ }
+
+ ThreadKey _key;
+ pthread_mutex_t _mutex;
+ // All pointers of data allocated by the ThreadLocal.
+ std::vector<T*> ptrs;
+ // Delete data on thread exit or destructor of ThreadLocal.
+ bool _delete_on_thread_exit;
+};
+
+template <typename T>
+ThreadLocal<T>::ThreadLocal(bool delete_on_thread_exit)
+ : _mutex(PTHREAD_MUTEX_INITIALIZER)
+ , _delete_on_thread_exit(delete_on_thread_exit) {
+ DtorFunction dtor = _delete_on_thread_exit ? DefaultDtor : NULL;
+ thread_key_create(_key, dtor);
+}
+
+
+template <typename T>
+ThreadLocal<T>::~ThreadLocal() {
+ thread_key_delete(_key);
+ if (!_delete_on_thread_exit) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ for (auto ptr : ptrs) {
+ DefaultDtor(ptr);
+ }
+ }
+ pthread_mutex_destroy(&_mutex);
+}
+
+template <typename T>
+T* ThreadLocal<T>::get() {
+ T* ptr = static_cast<T*>(thread_getspecific(_key));
+ if (!ptr) {
+ ptr = new (std::nothrow) T;
+ if (!ptr) {
+ return NULL;
+ }
+ int rc = thread_setspecific(_key, ptr);
+ if (rc != 0) {
+ DefaultDtor(ptr);
+ return NULL;
+ }
+ {
+ BAIDU_SCOPED_LOCK(_mutex);
+ ptrs.push_back(ptr);
+ }
+ }
+ return ptr;
+}
+
+template <typename T>
+void ThreadLocal<T>::reset(T* ptr) {
+ T* old_ptr = get();
+ if (thread_setspecific(_key, ptr) != 0) {
+ return;
+ }
+ {
+ BAIDU_SCOPED_LOCK(_mutex);
+ if (ptr) {
+ ptrs.push_back(ptr);
+ }
+ // Remove and delete old_ptr.
+ if (old_ptr) {
+ auto iter = std::find(ptrs.begin(), ptrs.end(), old_ptr);
+ if (iter!=ptrs.end()) {
+ ptrs.erase(iter);
+ }
+ DefaultDtor(old_ptr);
+ }
+ }
+}
+
+}
+
+
+#endif //BRPC_THREAD_KEY_H
diff --git a/test/BUILD.bazel b/test/BUILD.bazel
index 82dcd882..2c47aa10 100644
--- a/test/BUILD.bazel
+++ b/test/BUILD.bazel
@@ -127,6 +127,7 @@ TEST_BUTIL_SOURCES = [
"synchronous_event_unittest.cpp",
"temp_file_unittest.cpp",
"baidu_thread_local_unittest.cpp",
+ "thread_key_unittest.cpp",
"baidu_time_unittest.cpp",
"flat_map_unittest.cpp",
"crc32c_unittest.cc",
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 9720a0fa..aa441d27 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -162,6 +162,7 @@ SET(TEST_BUTIL_SOURCES
${PROJECT_SOURCE_DIR}/test/synchronous_event_unittest.cpp
${PROJECT_SOURCE_DIR}/test/temp_file_unittest.cpp
${PROJECT_SOURCE_DIR}/test/baidu_thread_local_unittest.cpp
+ ${PROJECT_SOURCE_DIR}/test/thread_key_unittest.cpp
${PROJECT_SOURCE_DIR}/test/baidu_time_unittest.cpp
${PROJECT_SOURCE_DIR}/test/flat_map_unittest.cpp
${PROJECT_SOURCE_DIR}/test/crc32c_unittest.cc
diff --git a/test/Makefile b/test/Makefile
index 871a99ed..0723d2df 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -132,6 +132,7 @@ TEST_BUTIL_SOURCES = \
synchronous_event_unittest.cpp \
temp_file_unittest.cpp \
baidu_thread_local_unittest.cpp \
+ thread_key_unittest.cpp \
baidu_time_unittest.cpp \
flat_map_unittest.cpp \
crc32c_unittest.cc \
diff --git a/test/thread_key_unittest.cpp b/test/thread_key_unittest.cpp
new file mode 100644
index 00000000..a4609aed
--- /dev/null
+++ b/test/thread_key_unittest.cpp
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+
+#include "butil/thread_key.h"
+#include "butil/fast_rand.h"
+#include "bthread/bthread.h"
+
+namespace butil {
+namespace {
+
+//pthread_key_xxx implication without num limit...
+//user promise no setspecific/getspecific called in calling
thread_key_delete().
+// Check whether an entry is unused.
+#define KEY_UNUSED(p) (((p) & 1) == 0)
+
+// Check whether a key is usable. We cannot reuse an allocated key if
+// the sequence counter would overflow after the next destroy call.
+// This would mean that we potentially free memory for a key with the
+// same sequence. This is *very* unlikely to happen, A program would
+// have to create and destroy a key 2^31 times. If it should happen we
+// simply don't use this specific key anymore.
+#define KEY_USABLE(p) (((size_t) (p)) < ((size_t) ((p) + 2)))
+
+bool g_started = false;
+bool g_stopped = false;
+
+struct ThreadKeyInfo {
+ uint32_t id;
+ uint32_t seq;
+};
+
+TEST(ThreadLocalTest, sanity) {
+ {
+ ThreadKey key;
+ for (int i = 0; i < 5; ++i) {
+ std::unique_ptr<int> data(new int(1));
+ int *raw_data = data.get();
+ ASSERT_EQ(0, butil::thread_key_create(key, NULL));
+
+ ASSERT_EQ(NULL, butil::thread_getspecific(key));
+ ASSERT_EQ(0, butil::thread_setspecific(key, (void *)raw_data));
+ ASSERT_EQ(raw_data, butil::thread_getspecific(key));
+
+ ASSERT_EQ(0, butil::thread_key_delete(key));
+ ASSERT_EQ(NULL, butil::thread_getspecific(key));
+ ASSERT_NE(0, butil::thread_setspecific(key, (void *)raw_data));
+ }
+ }
+
+ for (int i = 0; i < 5; ++i) {
+ ThreadLocal<int> tl;
+ ASSERT_TRUE(tl.get()!=NULL);
+ int* data = new int;
+ tl.reset(data); // tl owns data
+ ASSERT_EQ(data, tl.get());
+ tl.reset(); // data has been deleted
+ ASSERT_TRUE(tl.get()!=NULL);
+ }
+}
+
+TEST(ThreadLocalTest, thread_key_seq) {
+ std::vector<uint32_t> seqs;
+ std::vector<ThreadKey> keys;
+ for (int i = 0; i < 10000; ++i) {
+ bool create = fast_rand_less_than(2);
+ uint64_t num = fast_rand_less_than(5);
+ if (keys.empty() || create) {
+ for (uint64_t j = 0; j < num; ++j) {
+ keys.emplace_back();
+ ASSERT_EQ(0, butil::thread_key_create(keys.back(), NULL));
+ ASSERT_TRUE(!KEY_UNUSED(keys.back()._seq));
+ if (keys.back()._id >= seqs.size()) {
+ seqs.resize(keys.back()._id + 1);
+ } else {
+ ASSERT_EQ(seqs[keys.back()._id] + 2, keys.back()._seq);
+ }
+ seqs[keys.back()._id] = keys.back()._seq;
+ }
+ } else {
+ for (uint64_t j = 0; j < num && !keys.empty(); ++j) {
+ uint64_t index = fast_rand_less_than(keys.size());
+ ASSERT_TRUE(!KEY_UNUSED(seqs[keys[index]._id]));
+ ASSERT_EQ(0, butil::thread_key_delete(keys[index]));
+ keys.erase(keys.begin() + index);
+ }
+ }
+ }
+}
+
+void* THreadKeyCreateAndDeleteFunc(void* arg) {
+ while (!g_stopped) {
+ ThreadKey key;
+ EXPECT_EQ(0, butil::thread_key_create(key, NULL));
+ EXPECT_TRUE(!KEY_UNUSED(key._seq));
+ EXPECT_EQ(0, butil::thread_key_delete(key));
+ }
+ return NULL;
+}
+
+TEST(ThreadLocalTest, thread_key_create_and_delete) {
+ LOG(INFO) << "numeric_limits<uint32_t>::max()=" <<
std::numeric_limits<uint32_t>::max();
+ g_stopped = false;
+ const int thread_num = 8;
+ pthread_t threads[thread_num];
+ for (int i = 0; i < thread_num; ++i) {
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL,
THreadKeyCreateAndDeleteFunc, NULL));
+ }
+ sleep(2);
+ g_stopped = true;
+ for (const auto& thread : threads) {
+ pthread_join(thread, NULL);
+ }
+}
+
+void* ThreadLocalFunc(void* arg) {
+ auto thread_locals = (std::vector<ThreadLocal<int>*>*)arg;
+ std::vector<int> expects(thread_locals->size(), 0);
+ for (auto tl : *thread_locals) {
+ EXPECT_TRUE(tl->get() != NULL);
+ *(tl->get()) = 0;
+ }
+ while (!g_stopped) {
+ uint64_t index =
+ fast_rand_less_than(thread_locals->size());
+ EXPECT_TRUE((*thread_locals)[index]->get() != NULL);
+ EXPECT_EQ(*((*thread_locals)[index]->get()), expects[index]);
+ ++(*((*thread_locals)[index]->get()));
+ ++expects[index];
+ bthread_usleep(10);
+ }
+ return NULL;
+}
+
+TEST(ThreadLocalTest, thread_local_multi_thread) {
+ g_stopped = false;
+ int thread_local_num = 20480;
+ std::vector<ThreadLocal<int>*> args(thread_local_num, NULL);
+ for (int i = 0; i < thread_local_num; ++i) {
+ args[i] = new ThreadLocal<int>();
+ ASSERT_TRUE(args[i]->get() != NULL);
+ }
+ const int thread_num = 8;
+ pthread_t threads[thread_num];
+ for (int i = 0; i < thread_num; ++i) {
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadLocalFunc,
&args));
+ }
+
+ sleep(5);
+ g_stopped = true;
+ for (const auto& thread : threads) {
+ pthread_join(thread, NULL);
+ }
+ for (auto tl : args) {
+ delete tl;
+ }
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT ThreadKeyArg {
+ std::vector<ThreadKey*> thread_keys;
+ bool ready_delete = false;
+};
+
+bool g_deleted = false;
+void* ThreadKeyFunc(void* arg) {
+ auto thread_key_arg = (ThreadKeyArg*)arg;
+ auto thread_keys = thread_key_arg->thread_keys;
+ std::vector<int> expects(thread_keys.size(), 0);
+ for (auto key : thread_keys) {
+ EXPECT_TRUE(butil::thread_getspecific(*key) == NULL);
+ EXPECT_EQ(0, butil::thread_setspecific(*key, new int(0)));
+ EXPECT_EQ(*(static_cast<int*>(butil::thread_getspecific(*key))), 0);
+ }
+ while (!g_stopped) {
+ uint64_t index =
+ fast_rand_less_than(thread_keys.size());
+ auto data =
static_cast<int*>(butil::thread_getspecific(*thread_keys[index]));
+ EXPECT_TRUE(data != NULL);
+ EXPECT_EQ(*data, expects[index]);
+ ++(*data);
+ ++expects[index];
+ bthread_usleep(10);
+ }
+
+ thread_key_arg->ready_delete = true;
+ while (!g_deleted) {
+ bthread_usleep(10);
+ }
+
+ for (auto key : thread_keys) {
+ EXPECT_TRUE(butil::thread_getspecific(*key) == NULL)
+ << butil::thread_getspecific(*key);
+ }
+ return NULL;
+}
+
+TEST(ThreadLocalTest, thread_key_multi_thread) {
+ g_stopped = false;
+ g_deleted = false;
+ std::vector<ThreadKey*> thread_keys;
+ int key_num = 20480;
+ for (int i = 0; i < key_num; ++i) {
+ thread_keys.push_back(new ThreadKey());
+ ASSERT_EQ(0, butil::thread_key_create(*thread_keys.back(), [](void*
data) {
+ delete static_cast<int*>(data);
+ }));
+ ASSERT_TRUE(butil::thread_getspecific(*thread_keys.back()) == NULL);
+ ASSERT_EQ(0, butil::thread_setspecific(*thread_keys.back(), new
int(0)));
+
ASSERT_EQ(*(static_cast<int*>(butil::thread_getspecific(*thread_keys.back()))),
0);
+ }
+ const int thread_num = 8;
+ std::vector<ThreadKeyArg> args(thread_num);
+ pthread_t threads[thread_num];
+ for (int i = 0; i < thread_num; ++i) {
+ args[i].thread_keys = thread_keys;
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadKeyFunc,
&args[i]));
+ }
+
+ sleep(5);
+ g_stopped = true;
+ while (true) {
+ bool all_ready = true;
+ for (int i = 0; i < thread_num; ++i) {
+ if (!args[i].ready_delete) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ break;
+ }
+ usleep(1000);
+ }
+ for (auto key : thread_keys) {
+ ASSERT_EQ(0, butil::thread_key_delete(*key));
+ ASSERT_TRUE(butil::thread_getspecific(*key) == NULL);
+ }
+ g_deleted = true;
+
+ for (const auto& thread : threads) {
+ ASSERT_EQ(0, pthread_join(thread, NULL));
+ }
+ for (auto key : thread_keys) {
+ delete key;
+ }
+}
+
+DEFINE_bool(test_pthread_key, true, "test pthread_key");
+
+struct BAIDU_CACHELINE_ALIGNMENT ThreadKeyPerfArgs {
+ pthread_key_t pthread_key;
+ ThreadKey* thread_key;
+ bool is_pthread_key;
+ int64_t counter;
+ int64_t elapse_ns;
+ bool ready;
+
+ ThreadKeyPerfArgs()
+ : thread_key(NULL)
+ , is_pthread_key(true)
+ , counter(0)
+ , elapse_ns(0)
+ , ready(false) {}
+};
+
+void* ThreadKeyPerfFunc(void* void_arg) {
+ auto args = (ThreadKeyPerfArgs*)void_arg;
+ args->ready = true;
+ std::unique_ptr<int> data(new int(1));
+ if (args->is_pthread_key) {
+ pthread_setspecific(args->pthread_key, (void*)data.get());
+ } else {
+ butil::thread_setspecific(*args->thread_key, (void*)data.get());
+ }
+ butil::Timer t;
+ while (!g_stopped) {
+ if (g_started) {
+ break;
+ }
+ bthread_usleep(10);
+ }
+ t.start();
+ while (!g_stopped) {
+ if (args->is_pthread_key) {
+ pthread_getspecific(args->pthread_key);
+ } else {
+ butil::thread_getspecific(*args->thread_key);
+ }
+ ++args->counter;
+ }
+ t.stop();
+ args->elapse_ns = t.n_elapsed();
+ return NULL;
+}
+
+
+void ThreadKeyPerfTest(int thread_num, bool test_pthread_key) {
+ g_started = false;
+ g_stopped = false;
+ pthread_key_t pthread_key;
+ butil::ThreadKey thread_key;
+ if (test_pthread_key) {
+ ASSERT_EQ(0, pthread_key_create(&pthread_key, NULL));
+ } else {
+ ASSERT_EQ(0, butil::thread_key_create(thread_key, NULL));
+ }
+ pthread_t threads[thread_num];
+ std::vector<ThreadKeyPerfArgs> args(thread_num);
+ for (int i = 0; i < thread_num; ++i) {
+ if (test_pthread_key) {
+ args[i].pthread_key = pthread_key;
+ args[i].is_pthread_key = true;
+ } else {
+ args[i].thread_key = &thread_key;
+ args[i].is_pthread_key = false;
+ }
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadKeyPerfFunc,
&args[i]));
+ }
+ while (true) {
+ bool all_ready = true;
+ for (int i = 0; i < thread_num; ++i) {
+ if (!args[i].ready) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ break;
+ }
+ usleep(1000);
+ }
+ g_started = true;
+ int64_t run_ms = 5 * 1000;
+ usleep(run_ms * 1000);
+ g_stopped = true;
+ int64_t wait_time = 0;
+ int64_t count = 0;
+ for (int i = 0; i < thread_num; ++i) {
+ pthread_join(threads[i], NULL);
+ wait_time += args[i].elapse_ns;
+ count += args[i].counter;
+ }
+ if (test_pthread_key) {
+ ASSERT_EQ(0, pthread_key_delete(pthread_key));
+ } else {
+ ASSERT_EQ(0, butil::thread_key_delete(thread_key));
+ }
+ LOG(INFO) << (test_pthread_key ? "pthread_key" : "thread_key")
+ << " thread_num=" << thread_num
+ << " count=" << count
+ << " average_time=" << wait_time / (double)count;
+}
+
+struct BAIDU_CACHELINE_ALIGNMENT ThreadLocalPerfArgs {
+ ThreadLocal<int>* tl;
+ int64_t counter;
+ int64_t elapse_ns;
+ bool ready;
+
+ ThreadLocalPerfArgs()
+ : tl(NULL) , counter(0)
+ , elapse_ns(0) , ready(false) {}
+};
+
+void* ThreadLocalPerfFunc(void* void_arg) {
+ auto args = (ThreadLocalPerfArgs*)void_arg;
+ args->ready = true;
+ EXPECT_TRUE(args->tl->get() != NULL);
+ butil::Timer t;
+ while (!g_stopped) {
+ if (g_started) {
+ break;
+ }
+ bthread_usleep(10);
+ }
+ t.start();
+ while (!g_stopped) {
+ args->tl->get();
+ ++args->counter;
+ }
+ t.stop();
+ args->elapse_ns = t.n_elapsed();
+ return NULL;
+}
+
+void ThreadLocalPerfTest(int thread_num) {
+ g_started = false;
+ g_stopped = false;
+ ThreadLocal<int> tl;
+ pthread_t threads[thread_num];
+ std::vector<ThreadLocalPerfArgs> args(thread_num);
+ for (int i = 0; i < thread_num; ++i) {
+ args[i].tl = &tl;
+ ASSERT_EQ(0, pthread_create(&threads[i], NULL, ThreadLocalPerfFunc,
&args[i]));
+ }
+ while (true) {
+ bool all_ready = true;
+ for (int i = 0; i < thread_num; ++i) {
+ if (!args[i].ready) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ break;
+ }
+ usleep(1000);
+ }
+ g_started = true;
+ int64_t run_ms = 5 * 1000;
+ usleep(run_ms * 1000);
+ g_stopped = true;
+ int64_t wait_time = 0;
+ int64_t count = 0;
+ for (int i = 0; i < thread_num; ++i) {
+ pthread_join(threads[i], NULL);
+ wait_time += args[i].elapse_ns;
+ count += args[i].counter;
+ }
+ LOG(INFO) << "ThreadLocal thread_num=" << thread_num
+ << " count=" << count
+ << " average_time=" << wait_time / (double)count;
+}
+
+TEST(ThreadLocalTest, thread_key_performance) {
+ int thread_num = 1;
+ ThreadKeyPerfTest(thread_num, true);
+ ThreadKeyPerfTest(thread_num, false);
+ ThreadLocalPerfTest(thread_num);
+
+ thread_num = 4;
+ ThreadKeyPerfTest(thread_num, true);
+ ThreadKeyPerfTest(thread_num, false);
+ ThreadLocalPerfTest(thread_num);
+
+ thread_num = 8;
+ ThreadKeyPerfTest(thread_num, true);
+ ThreadKeyPerfTest(thread_num, false);
+ ThreadLocalPerfTest(thread_num);
+
+}
+
+}
+} // namespace butil
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]