This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 792df3612 feat(info): add thread CPU time measurement for worker 
thread monitoring (#3379)
792df3612 is described below

commit 792df3612c2d6bde2159cacbd104f12e0f73c6d5
Author: sryan yuan <[email protected]>
AuthorDate: Tue Mar 3 16:32:27 2026 +0800

    feat(info): add thread CPU time measurement for worker thread monitoring 
(#3379)
    
    ## Summary
    This PR introduces worker thread CPU time monitoring to identify
    bottlenecks and fixes precision issues in process CPU time calculations.
    
    ## Problem Statement
    1. **Lack of granular thread monitoring**: The current `INFO` command
    provides only overall process CPU time, which includes background
    threads. This makes it difficult to identify bottlenecks in worker
    threads specifically, as background thread activity can mask true worker
    thread utilization.
    
    2. **Precision loss in CPU time calculation**: The existing CPU time
    calculation truncates microsecond values due to integer division
    (`tv_usec / 1000000`), causing millisecond data loss. This results in
    inaccurate reporting, especially for short operations where sub-second
    precision matters.
    
    ## Solution
    ### 1. Worker Thread CPU Time Monitoring
    - Added new `worker_cpu_time` metric to track CPU usage per worker
    thread
    - Implemented per-thread CPU time collection using platform-specific
    APIs
    - Formatted output with microsecond precision (6 decimal places) to
    match system precision
    - Sample output: `worker_cpu_time:[0.123456,0.456789,1.234567]`
    
    ### 2. CPU Time Calculation Fix
    - Fixed precision loss in `used_cpu_user` and `used_cpu_sys`
    calculations
    - Replaced integer division with floating point division:
      **Before**: `tv_usec / 1000000` → truncates to integer
    **After**: `static_cast<double>(tv_usec) / 1e6` → preserves microseconds
    
    ## Benefits
    - **Pinpoint worker thread bottlenecks**: Isolate worker thread CPU
    usage from background threads
    - **Accurate performance analysis**: Microsecond-precision timing
    enables precise performance profiling
    - **Better load diagnosis**: Distinguish between actual worker
    saturation and background activity
    
    ---------
    
    Co-authored-by: yxj25245 <[email protected]>
---
 src/common/thread_util.cc | 42 ++++++++++++++++++++++++++++++++++++++++++
 src/common/thread_util.h  |  2 ++
 src/server/server.cc      | 14 +++++++++++---
 src/server/worker.cc      |  7 ++++++-
 src/server/worker.h       |  7 ++++++-
 5 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/src/common/thread_util.cc b/src/common/thread_util.cc
index 150e713b4..93267a08f 100644
--- a/src/common/thread_util.cc
+++ b/src/common/thread_util.cc
@@ -23,6 +23,11 @@
 #include <fmt/std.h>
 #include <pthread.h>
 
+#ifdef __APPLE__
+#include <mach/mach.h>
+#include <mach/thread_act.h>
+#endif
+
 namespace util {
 
 void ThreadSetName(const char *name) {
@@ -33,6 +38,43 @@ void ThreadSetName(const char *name) {
 #endif
 }
 
+#ifdef __APPLE__
+double ThreadGetCPUTime(std::thread::native_handle_type thread_id) {
+  if (!thread_id) {
+    return 0.0;
+  }
+
+  mach_port_t mach_thread = pthread_mach_thread_np(thread_id);
+
+  thread_basic_info_data_t info;
+  mach_msg_type_number_t count = THREAD_BASIC_INFO_COUNT;
+
+  if (thread_info(mach_thread, THREAD_BASIC_INFO, (thread_info_t)&info, 
&count) != KERN_SUCCESS) {
+    return 0.0;
+  }
+
+  return (static_cast<double>(info.user_time.seconds) + 
static_cast<double>(info.user_time.microseconds) / 1e6) +
+         (static_cast<double>(info.system_time.seconds) + 
static_cast<double>(info.system_time.microseconds) / 1e6);
+}
+#else
+double ThreadGetCPUTime(std::thread::native_handle_type thread_id) {
+  if (!thread_id) {
+    return 0.0;
+  }
+
+  clockid_t clock_id = 0;
+  if (pthread_getcpuclockid(thread_id, &clock_id) != 0) {
+    return 0.0;
+  }
+
+  timespec ts;
+  if (clock_gettime(clock_id, &ts) != 0) {
+    return 0.0;
+  }
+  return static_cast<double>(ts.tv_sec) + static_cast<double>(ts.tv_nsec) / 
1e9;
+}
+#endif
+
 template <void (std::thread::*F)(), typename... Args>
 Status ThreadOperationImpl(std::thread &t, const char *op, Args &&...args) {
   try {
diff --git a/src/common/thread_util.h b/src/common/thread_util.h
index f4c9cc753..3d7415582 100644
--- a/src/common/thread_util.h
+++ b/src/common/thread_util.h
@@ -32,6 +32,8 @@ namespace util {
 
 void ThreadSetName(const char *name);
 
+double ThreadGetCPUTime(std::thread::native_handle_type thread_id);
+
 template <typename F>
 StatusOr<std::thread> CreateThread(const char *name, F f) {
   try {
diff --git a/src/server/server.cc b/src/server/server.cc
index e3a5ec33e..5c337a23a 100644
--- a/src/server/server.cc
+++ b/src/server/server.cc
@@ -1402,10 +1402,18 @@ Server::InfoEntries Server::GetCpuInfo() {  // 
NOLINT(readability-convert-member
 
   rusage self_ru;
   getrusage(RUSAGE_SELF, &self_ru);
-  entries.emplace_back("used_cpu_sys", 
static_cast<float>(self_ru.ru_stime.tv_sec) +
-                                           
static_cast<float>(self_ru.ru_stime.tv_usec / 1000000));
+  entries.emplace_back(
+      "used_cpu_sys", static_cast<float>(self_ru.ru_stime.tv_sec) + 
static_cast<float>(self_ru.ru_stime.tv_usec) / 1e6);
   entries.emplace_back("used_cpu_user", 
static_cast<float>(self_ru.ru_utime.tv_sec) +
-                                            
static_cast<float>(self_ru.ru_utime.tv_usec / 1000000));
+                                            
static_cast<float>(self_ru.ru_utime.tv_usec) / 1e6);
+
+  std::vector<double> thread_cpu_times(worker_threads_.size());
+  for (std::size_t i{0}; i < worker_threads_.size(); ++i) {
+    thread_cpu_times[i] = 
util::ThreadGetCPUTime(worker_threads_[i]->GetNativeHandle());
+  }
+  entries.emplace_back(
+      "worker_cpu_time",
+      fmt::format("[{}]", util::StringJoin(thread_cpu_times, [](auto v) { 
return std::to_string(v); }, ",")));
 
   return entries;
 }
diff --git a/src/server/worker.cc b/src/server/worker.cc
index 5078546e5..e97e374b5 100644
--- a/src/server/worker.cc
+++ b/src/server/worker.cc
@@ -23,6 +23,7 @@
 #include <event2/util.h>
 #include <unistd.h>
 
+#include <atomic>
 #include <cstdint>
 #include <stdexcept>
 #include <string>
@@ -589,6 +590,7 @@ void WorkerThread::Start() {
 
   if (s) {
     t_ = std::move(*s);
+    native_thread_handle_.store(t_.native_handle(), std::memory_order_relaxed);
   } else {
     ERROR("[worker] Failed to start worker thread, err: {}", s.Msg());
     return;
@@ -597,7 +599,10 @@ void WorkerThread::Start() {
   INFO("[worker] Thread #{} started", fmt::streamed(t_.get_id()));
 }
 
-void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }
+void WorkerThread::Stop(uint32_t wait_seconds) {
+  native_thread_handle_.store(std::thread::native_handle_type{}, 
std::memory_order_relaxed);
+  worker_->Stop(wait_seconds);
+}
 
 void WorkerThread::Join() {
   if (auto s = util::ThreadJoin(t_); !s) {
diff --git a/src/server/worker.h b/src/server/worker.h
index bf6918408..bf4e86b23 100644
--- a/src/server/worker.h
+++ b/src/server/worker.h
@@ -25,6 +25,7 @@
 #include <event2/listener.h>
 #include <event2/util.h>
 
+#include <atomic>
 #include <cstdint>
 #include <cstring>
 #include <lua.hpp>
@@ -104,7 +105,8 @@ class Worker : EventCallbackBase<Worker>, 
EvconnlistenerBase<Worker> {
 
 class WorkerThread {
  public:
-  explicit WorkerThread(std::unique_ptr<Worker> worker) : 
worker_(std::move(worker)) {}
+  explicit WorkerThread(std::unique_ptr<Worker> worker)
+      : native_thread_handle_{std::thread::native_handle_type{}}, 
worker_(std::move(worker)) {}
   ~WorkerThread() = default;
   WorkerThread(const WorkerThread &) = delete;
   WorkerThread(WorkerThread &&) = delete;
@@ -116,7 +118,10 @@ class WorkerThread {
   void Join();
   bool IsTerminated() const { return worker_->IsTerminated(); }
 
+  std::thread::native_handle_type GetNativeHandle() { return 
native_thread_handle_.load(std::memory_order_relaxed); }
+
  private:
   std::thread t_;
+  std::atomic<std::thread::native_handle_type> native_thread_handle_;
   std::unique_ptr<Worker> worker_;
 };

Reply via email to