This is an automated email from the ASF dual-hosted git repository.
mgreber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new e08855500 [util] remove chromium-based Atomics from env_posix.cc
e08855500 is described below
commit e08855500fa10474d1d1cd7914f91767d5ee1cab
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Nov 18 15:19:00 2025 -0800
[util] remove chromium-based Atomics from env_posix.cc
With this patch, Atomic64 and Barrier_AtomicIncrement() are no longer
used in env_posix.cc: the logic of PosixEnv::gettid() is now implemented
using std::atomic. Also, the global static and thread-local static
variables are moved inside of the PosixEnv::gettid() method which is an
improvement as well, according to Clang-Tidy.
In addition, I added a new test scenario to verify that the generated
thread identifiers are indeed unique. I didn't want to rely on existing
CHECK()/DCHECK() assertions in various places for indirect verification
of a few explicit invariants.
Apart from the new test scenario, there are no functional modifications
in this changelist.
Change-Id: I63d883de66c51d5d4a6c7eb1eebe63713f1f6dcc
Reviewed-on: http://gerrit.cloudera.org:8080/23689
Tested-by: Kudu Jenkins
Reviewed-by: Ashwani Raina <[email protected]>
Reviewed-by: Marton Greber <[email protected]>
---
src/kudu/util/env_posix.cc | 17 +++++---
src/kudu/util/mt-threadlocal-test.cc | 83 +++++++++++++++++++++++++++++++++++-
2 files changed, 92 insertions(+), 8 deletions(-)
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 4809bc312..b637cccbc 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -7,8 +7,12 @@
#include <fnmatch.h>
#include <fts.h>
#include <glob.h>
+#include <openssl/crypto.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
+#if OPENSSL_VERSION_NUMBER >= 0x30000000L
+#include <openssl/types.h>
+#endif
#include <pthread.h>
#include <sys/resource.h>
#include <sys/socket.h>
@@ -18,8 +22,10 @@
#include <sys/uio.h>
#include <sys/utsname.h>
#include <unistd.h>
+// IWYU pragma: no_include <bits/struct_stat.h>
#include <algorithm>
+#include <atomic>
#include <cerrno>
#include <climits>
#include <cstdint>
@@ -39,7 +45,6 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
-#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/macros.h"
@@ -87,8 +92,6 @@
#include <sys/vfs.h>
#endif // defined(__APPLE__)
-using base::subtle::Atomic64;
-using base::subtle::Barrier_AtomicIncrement;
using kudu::security::ssl_make_unique;
using std::accumulate;
using std::shared_ptr;
@@ -230,9 +233,6 @@ bool ValidateMultiTenancySettings() {
}
GROUP_FLAG_VALIDATOR(enable_multi_tenancy, ValidateMultiTenancySettings);
-static __thread uint64_t thread_local_id;
-static Atomic64 cur_thread_local_id_;
-
namespace kudu {
const char* const Env::kInjectedFailureStatusMsg = "INJECTED FAILURE";
@@ -2013,11 +2013,14 @@ class PosixEnv : public Env {
}
uint64_t gettid() override {
+ static std::atomic<uint64_t> cur_thread_local_id{0};
+ static __thread uint64_t thread_local_id{0};
// Platform-independent thread ID. We can't use pthread_self here,
// because that function returns a totally opaque ID, which can't be
// compared via normal means.
if (thread_local_id == 0) {
- thread_local_id = Barrier_AtomicIncrement(&cur_thread_local_id_, 1);
+ // pre-increment is equivalent to 'cur_thread_local_id_.fetch_add(1) + 1'
+ thread_local_id = ++cur_thread_local_id;
}
return thread_local_id;
}
diff --git a/src/kudu/util/mt-threadlocal-test.cc
b/src/kudu/util/mt-threadlocal-test.cc
index 484387410..cd42da9f4 100644
--- a/src/kudu/util/mt-threadlocal-test.cc
+++ b/src/kudu/util/mt-threadlocal-test.cc
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
+#include <cstddef>
#include <cstdint>
+#include <iterator>
#include <memory>
#include <ostream>
+#include <set>
#include <string>
#include <thread>
#include <unordered_set>
-#include <utility>
#include <vector>
#include <glog/logging.h>
@@ -39,6 +42,7 @@
#include "kudu/util/threadlocal.h"
#include "kudu/util/threadlocal_cache.h"
+using std::set;
using std::string;
using std::thread;
using std::unique_ptr;
@@ -353,5 +357,82 @@ TEST_F(ThreadLocalTest, TestThreadLocalCache) {
ASSERT_EQ(nullptr, tlc->Lookup(1));
}
+// Verify that PosixEnv::gettid() generates unique thread identifiers.
+TEST_F(ThreadLocalTest, ThreadId) {
+ constexpr const size_t kNumThreadsLevel0 = 4;
+ constexpr const size_t kNumThreadsLevel1 = 16;
+
+ vector<vector<size_t>> thread_ids;
+ thread_ids.resize(kNumThreadsLevel0);
+ for (size_t i = 0; i < kNumThreadsLevel0; ++i) {
+ thread_ids[i].assign(kNumThreadsLevel1, 0);
+ }
+
+ vector<thread> level0_threads;
+ level0_threads.reserve(kNumThreadsLevel0);
+
+ Env* env = Env::Default();
+ const uint64_t main_tid = env->gettid();
+
+ CountDownLatch start(1);
+ for (size_t l0_idx = 0; l0_idx < kNumThreadsLevel0; ++l0_idx) {
+ level0_threads.emplace_back([&, idx = l0_idx]() {
+ vector<thread> level1_threads;
+ level1_threads.reserve(kNumThreadsLevel1);
+ // Wait on a latch to start creating threads concurrently with other
+ // level0 threads.
+ start.Wait();
+ for (size_t l1_idx = 0; l1_idx < kNumThreadsLevel1; ++l1_idx) {
+ level1_threads.emplace_back([&, parent_idx = idx, my_idx = l1_idx]() {
+ thread_ids[parent_idx][my_idx] = env->gettid();
+ });
+ }
+ for (auto& t : level1_threads) {
+ t.join();
+ }
+ });
+ }
+ start.CountDown();
+
+ for (auto& t : level0_threads) {
+ t.join();
+ }
+
+ set<size_t> unique_ids;
+ for (const auto& ids : thread_ids) {
+ std::copy(ids.cbegin(), ids.cend(), std::inserter(unique_ids,
unique_ids.end()));
+ }
+ ASSERT_EQ(kNumThreadsLevel0 * kNumThreadsLevel1, unique_ids.size());
+ ASSERT_GT(*unique_ids.cbegin(), main_tid);
+
+ // Once all the threads spawned above are joined, start a few new ones and
+ // make sure the newly dispensed identifiers don't repeat the identifiers
+ // of the threads that have exited already.
+ {
+ constexpr const size_t kNumThreads = 10;
+ vector<thread> threads;
+ threads.reserve(kNumThreads);
+ vector<size_t> thread_ids;
+ thread_ids.assign(kNumThreads, 0);
+
+ for (size_t idx = 0; idx < kNumThreads; ++idx) {
+ threads.emplace_back([&, my_idx = idx]() {
+ thread_ids[my_idx] = env->gettid();
+ });
+ }
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ set<size_t> ids;
+ std::copy(thread_ids.cbegin(), thread_ids.cend(), std::inserter(ids,
ids.end()));
+ // All identifiers must be unique.
+ ASSERT_EQ(kNumThreads, ids.size());
+ // All of the new identifiers are greater than any of the the identifiers
+ // generated earlier.
+ ASSERT_GT(*ids.cbegin(), *unique_ids.crbegin());
+ }
+}
+
} // namespace threadlocal
} // namespace kudu