This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f1d8939 Add support for checking all living bthreads (#3096)
5f1d8939 is described below

commit 5f1d893982a13c4c913ded79086ed2f781891459
Author: Regal <[email protected]>
AuthorDate: Wed Oct 22 20:13:30 2025 +0800

    Add support for checking all living bthreads (#3096)
    
    User can check all living bthreads by `curl ip:port/bthreads/all`
    or when BRPC_BTHREAD_TRACER is enabled by `curl ip:port/bthreads/all?st=1`
    to show bthread stack trace.
    This is an enhancement of the original /bthreads service which
    provides a method to check a specified bthread by designated
    bthread id, as user has no idea what the bthread id is.
    
    BTW, fix _enable_priority_queue not initialized bug and fix task status
    incorrectly set to TASK_STATUS_FIRST_READY bug.
---
 src/brpc/builtin/bthreads_service.cpp  | 32 ++++++++++++---------
 src/brpc/builtin/index_service.cpp     |  2 +-
 src/bthread/bthread.cpp                | 20 +++++++++++++
 src/bthread/task_control.cpp           | 20 +++++++++++++
 src/bthread/task_control.h             |  1 +
 src/bthread/task_group.cpp             | 24 +++++++++++-----
 src/bthread/task_group.h               |  2 +-
 src/bthread/task_group_inl.h           |  4 +--
 src/bthread/task_tracer.cpp            |  2 +-
 src/butil/resource_pool.h              |  5 ++++
 src/butil/resource_pool_inl.h          | 22 +++++++++++++++
 test/brpc_builtin_service_unittest.cpp | 51 ++++++++++++++++++++++++++++++----
 12 files changed, 155 insertions(+), 30 deletions(-)

diff --git a/src/brpc/builtin/bthreads_service.cpp 
b/src/brpc/builtin/bthreads_service.cpp
index 7676f656..fca86bc6 100644
--- a/src/brpc/builtin/bthreads_service.cpp
+++ b/src/brpc/builtin/bthreads_service.cpp
@@ -23,7 +23,9 @@
 #include "brpc/builtin/bthreads_service.h"
 
 namespace bthread {
-extern void print_task(std::ostream& os, bthread_t tid);
+extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
+                       bool ignore_not_matched = false);
+extern void print_living_tasks(std::ostream& os, bool enable_trace);
 }
 
 
@@ -38,30 +40,34 @@ void 
BthreadsService::default_method(::google::protobuf::RpcController* cntl_bas
     cntl->http_response().set_content_type("text/plain");
     butil::IOBufBuilder os;
     const std::string& constraint = cntl->http_request().unresolved_path();
-    
     if (constraint.empty()) {
 #ifdef BRPC_BTHREAD_TRACER
-        os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for 
stack trace";
+        os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for 
stack trace\n";
+        os << "To check all living bthread, use /bthreads/all or 
/bthreads/all?st=1 for stack trace\n";
 #else
-        os << "Use /bthreads/<bthread_id>";
+        os << "Use /bthreads/<bthread_id>\n";
+        os << "To check all living bthread, use /bthreads/all\n";
 #endif // BRPC_BTHREAD_TRACER
     } else {
-        char* endptr = NULL;
-        bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
-        if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
-            ::bthread::print_task(os, tid);
-
+        bool enable_trace = false;
 #ifdef BRPC_BTHREAD_TRACER
             const std::string* st = cntl->http_request().uri().GetQuery("st");
             if (NULL != st && *st == "1") {
-                os << "\nbthread call stack:\n";
-                ::bthread::stack_trace(os, tid);
+                enable_trace = true;
             }
 #endif // BRPC_BTHREAD_TRACER
-        } else {
-            cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id",
+        char* endptr = NULL;
+        bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
+        if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
+            ::bthread::print_task(os, tid, enable_trace);
+        }
+        else if (constraint != "all" && constraint != "all?st=1") {
+            cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id or all, or 
all?st=1\n",
                             constraint.c_str());
+        } else {
+            ::bthread::print_living_tasks(os, enable_trace);
         }
+
     }
     os.move_to(cntl->response_attachment());
 }
diff --git a/src/brpc/builtin/index_service.cpp 
b/src/brpc/builtin/index_service.cpp
index 62cb87c1..3b1aa3bc 100644
--- a/src/brpc/builtin/index_service.cpp
+++ b/src/brpc/builtin/index_service.cpp
@@ -158,7 +158,7 @@ void 
IndexService::default_method(::google::protobuf::RpcController* controller,
        << Path("/health", html_addr) << " : Test healthy" << NL
        << Path("/vlog", html_addr) << " : List all VLOG callsites" << NL
        << Path("/sockets", html_addr) << " : Check status of a Socket" << NL
-       << Path("/bthreads", html_addr) << " : Check status of a bthread" << NL
+       << Path("/bthreads", html_addr) << " : Check status of a bthread or all 
living bthreads" << NL
        << Path("/ids", html_addr) << " : Check status of a bthread_id" << NL
        << Path("/protobufs", html_addr) << " : List all protobuf services and 
messages" << NL
        << Path("/list", html_addr) << " : json signature of methods" << NL
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index a5f178e3..085d814d 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -32,6 +32,8 @@
 #include "bthread/bthread.h"
 
 namespace bthread {
+extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
+                       bool ignore_not_matched = false);
 
 static bool validate_bthread_concurrency(const char*, int32_t val) {
     // bthread_setconcurrency sets the flag on success path which should
@@ -192,8 +194,26 @@ std::string stack_trace(bthread_t tid) {
     }
     return c->stack_trace(tid);
 }
+
 #endif // BRPC_BTHREAD_TRACER
 
+// Print all living (started and not finished) bthreads
+void print_living_tasks(std::ostream& os, bool enable_trace) {
+    TaskControl* c = get_task_control();
+    if (NULL == c) {
+        os << "TaskControl has not been created";
+        return;
+    }
+    auto tids = c->get_living_bthreads();
+    if (tids.empty()) {
+        os << "No living bthreads\n";
+        return;
+    }
+    for (auto tid : tids) {
+        print_task(os, tid, enable_trace, true);
+    }
+}
+
 static int add_workers_for_each_tag(int num) {
     int added = 0;
     auto c = get_task_control();
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 0b34955b..feeeb7d4 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -192,6 +192,7 @@ TaskControl::TaskControl()
     , _signal_per_second(&_cumulated_signal_count)
     , _status(print_rq_sizes_in_the_tc, this)
     , _nbthreads("bthread_count")
+    , _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
     , _priority_queues(FLAGS_task_group_ntags)
     , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
     , _tagged_pl(FLAGS_task_group_ntags)
@@ -587,4 +588,23 @@ bvar::LatencyRecorder* 
TaskControl::create_exposed_pending_time() {
     return pt;
 }
 
+std::vector<bthread_t> TaskControl::get_living_bthreads() {
+    std::vector<bthread_t> living_bthread_ids;
+    living_bthread_ids.reserve(1024);
+    butil::for_each_resource<TaskMeta>([&living_bthread_ids](TaskMeta* m) {
+        // filter out those bthreads created by bthread_start* functions,
+        // i.e. not those created internally to run main task as they are
+        // opaque to user.
+        if (m && m->fn) {
+            // determine whether the bthread is living by checking version
+            const uint32_t given_ver = get_version(m->tid);
+            BAIDU_SCOPED_LOCK(m->version_lock);
+            if (given_ver == *m->version_butex) {
+                living_bthread_ids.push_back(m->tid);
+            }
+        }
+    });
+    return living_bthread_ids;
+}
+
 }  // namespace bthread
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 4d666025..439c96db 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -101,6 +101,7 @@ public:
         _priority_queues[tag].push(tid);
     }
 
+    std::vector<bthread_t> get_living_bthreads();
 private:
     typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
     typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 67f029a0..40daaa1a 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -36,6 +36,7 @@
 #include "bthread/task_control.h"
 #include "bthread/task_group.h"
 #include "bthread/timer_thread.h"
+#include "bthread/bthread.h"
 
 #ifdef __x86_64__
 #include <x86intrin.h>
@@ -678,7 +679,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
             }
         }
     }
-    sched_to(pg, next_meta, true);
+    sched_to(pg, next_meta);
 }
 
 void TaskGroup::sched(TaskGroup** pg) {
@@ -699,7 +700,7 @@ void TaskGroup::sched(TaskGroup** pg) {
 
 extern void CheckBthreadScheSafety();
 
-void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending) 
{
+void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
     TaskGroup* g = *pg;
 #ifndef NDEBUG
     if ((++g->_sched_recursive_guard) > 1) {
@@ -1088,10 +1089,11 @@ void TaskGroup::yield(TaskGroup** pg) {
     sched(pg);
 }
 
-void print_task(std::ostream& os, bthread_t tid) {
+void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
+                bool ignore_not_matched = false) {
     TaskMeta* const m = TaskGroup::address_meta(tid);
     if (m == NULL) {
-        os << "bthread=" << tid << " : never existed";
+        os << "bthread=" << tid << " : never existed\n";
         return;
     }
     const uint32_t given_ver = get_version(tid);
@@ -1127,7 +1129,9 @@ void print_task(std::ostream& os, bthread_t tid) {
         }
     }
     if (!matched) {
-        os << "bthread=" << tid << " : not exist now";
+        if (!ignore_not_matched) {
+            os << "bthread=" << tid << " : not exist now\n";
+        }
     } else {
         os << "bthread=" << tid << " :\nstop=" << stop
            << "\ninterrupted=" << interrupted
@@ -1136,6 +1140,7 @@ void print_task(std::ostream& os, bthread_t tid) {
            << "\narg=" << (void*)arg
            << "\nattr={stack_type=" << attr.stack_type
            << " flags=" << attr.flags
+           << " specified_tag=" << attr.tag
            << " keytable_pool=" << attr.keytable_pool
            << "}\nhas_tls=" << has_tls
            << "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
@@ -1145,8 +1150,13 @@ void print_task(std::ostream& os, bthread_t tid) {
            << "\nstatus=" << status
            << "\ntraced=" << traced
            << "\nworker_tid=" << worker_tid;
-#else
-           ;
+        if (enable_trace) {
+            os << "\nbthread call stack:\n";
+            stack_trace(os, tid);
+        }
+        os << "\n\n";
+ #else
+           << "\n\n";
            (void)status;(void)traced;(void)worker_tid;
 #endif // BRPC_BTHREAD_TRACER
     }
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index 958f81d7..f3e1d402 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -111,7 +111,7 @@ public:
     // Suspend caller and run bthread `next_tid' in TaskGroup *pg.
     // Purpose of this function is to avoid pushing `next_tid' to _rq and
     // then being popped by sched(pg), which is not necessary.
-    static void sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending);
+    static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
     static void sched_to(TaskGroup** pg, bthread_t next_tid);
     static void exchange(TaskGroup** pg, TaskMeta* next_meta);
 
diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h
index aec1a284..faa5683b 100644
--- a/src/bthread/task_group_inl.h
+++ b/src/bthread/task_group_inl.h
@@ -56,7 +56,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta* 
next_meta) {
                      ? ready_to_run_in_worker_ignoresignal
                      : ready_to_run_in_worker),
                     &args);
-    TaskGroup::sched_to(pg, next_meta, false);
+    TaskGroup::sched_to(pg, next_meta);
 }
 
 inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
@@ -79,7 +79,7 @@ inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t 
next_tid) {
         }
     }
     // Update now_ns only when wait_task did yield.
-    sched_to(pg, next_meta, false);
+    sched_to(pg, next_meta);
 }
 
 inline void TaskGroup::push_rq(bthread_t tid) {
diff --git a/src/bthread/task_tracer.cpp b/src/bthread/task_tracer.cpp
index 2602fe31..f9d14f02 100644
--- a/src/bthread/task_tracer.cpp
+++ b/src/bthread/task_tracer.cpp
@@ -147,7 +147,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
 
         tracing = m->traced;
         // bthread is scheduled for the first time.
-        if (TASK_STATUS_READY == s || NULL == m->stack) {
+        if (TASK_STATUS_READY == s && NULL == m->stack) {
             m->status = TASK_STATUS_FIRST_READY;
         } else {
             m->status = s;
diff --git a/src/butil/resource_pool.h b/src/butil/resource_pool.h
index f5667938..e1c09faf 100644
--- a/src/butil/resource_pool.h
+++ b/src/butil/resource_pool.h
@@ -132,6 +132,11 @@ template <typename T> ResourcePoolInfo 
describe_resources() {
     return ResourcePool<T>::singleton()->describe_resources();
 }
 
+// Traverse all allocated resources typed T and apply specified operation
+template <typename T, typename F>
+void for_each_resource(F const& f) {
+    ResourcePool<T>::singleton()->for_each_resource(f);
+}
 }  // namespace butil
 
 #endif  // BUTIL_RESOURCE_POOL_H
diff --git a/src/butil/resource_pool_inl.h b/src/butil/resource_pool_inl.h
index 3274e9b7..82649103 100644
--- a/src/butil/resource_pool_inl.h
+++ b/src/butil/resource_pool_inl.h
@@ -307,6 +307,28 @@ public:
         const size_t n = ResourcePoolFreeChunkMaxItem<T>::value();
         return n < FREE_CHUNK_NITEM ? n : FREE_CHUNK_NITEM;
     }
+
+    template <typename F>
+    void for_each_resource(F const& f) {
+        for (size_t i = 0; i < _ngroup.load(butil::memory_order_acquire); ++i) 
{
+            BlockGroup* bg = 
_block_groups[i].load(butil::memory_order_consume);
+            if (NULL == bg) {
+                break;
+            }
+            size_t nblock = 
std::min(bg->nblock.load(butil::memory_order_relaxed),
+                                     RP_GROUP_NBLOCK);
+            for (size_t j = 0; j < nblock; ++j) {
+                Block* b = bg->blocks[j].load(butil::memory_order_consume);
+                if (NULL != b) {
+                    for (size_t k = 0; k < b->nitem; ++k) {
+                        auto item = b->items + k;
+                        T* obj = (T*)item->void_data();
+                        f(obj);
+                    }
+                }
+            }
+        }
+    }
     
     // Number of all allocated objects, including being used and free.
     ResourcePoolInfo describe_resources() const {
diff --git a/test/brpc_builtin_service_unittest.cpp 
b/test/brpc_builtin_service_unittest.cpp
index 76b99811..4ba8a039 100644
--- a/test/brpc_builtin_service_unittest.cpp
+++ b/test/brpc_builtin_service_unittest.cpp
@@ -852,6 +852,34 @@ void* bthread_trace(void*) {
 }
 #endif // BRPC_BTHREAD_TRACER
 
+// check all living bthreads without need to specify bthread id
+bool check_all_bthreads(bthread_t expected_th, bool enable_trace) {
+    brpc::BthreadsService service;
+    brpc::BthreadsRequest req;
+    brpc::BthreadsResponse res;
+    ClosureChecker done;
+    brpc::Controller cntl;
+    std::string all_string("all");
+    if (enable_trace) {
+        all_string.append("?st=1");
+    }
+    cntl.http_request()._unresolved_path = all_string;
+    cntl.http_request().uri().SetHttpURL("/bthreads/" + all_string);
+    service.default_method(&cntl, &req, &res, &done);
+    EXPECT_FALSE(cntl.Failed());
+    const std::string& content = cntl.response_attachment().to_string();
+    std::string expected_str = butil::string_printf("bthread=%llu",
+        (unsigned long long)expected_th);
+    bool ok = content.find("stop=0") != std::string::npos &&
+        content.find(expected_str) != std::string::npos;
+    if (ok && enable_trace) {
+        ok = content.find("bthread_trace") != std::string::npos;
+    } else if (ok && !enable_trace) {
+        ok = content.find("bthread_trace") == std::string::npos;
+    }
+    return ok;
+}
+
 TEST_F(BuiltinServiceTest, bthreads) {
     brpc::BthreadsService service;
     brpc::BthreadsRequest req;
@@ -869,8 +897,16 @@ TEST_F(BuiltinServiceTest, bthreads) {
         cntl.http_request()._unresolved_path = "not_valid";
         service.default_method(&cntl, &req, &res, &done);
         EXPECT_TRUE(cntl.Failed());
-        CheckErrorText(cntl, "is not a bthread id");
+        CheckErrorText(cntl, "is not a bthread id or all");
     }    
+    {
+        ClosureChecker done;
+        brpc::Controller cntl;
+        cntl.http_request()._unresolved_path = "all";
+        service.default_method(&cntl, &req, &res, &done);
+        EXPECT_FALSE(cntl.Failed());
+        CheckContent(cntl, "stop=0");
+    }
     {
         bthread_t th;
         EXPECT_EQ(0, bthread_start_background(&th, NULL, dummy_bthread, NULL));
@@ -885,13 +921,14 @@ TEST_F(BuiltinServiceTest, bthreads) {
     }
 
 #ifdef BRPC_BTHREAD_TRACER
-    bool ok = false;
+    bool ok = false, check_all_ok = false;
     for (int i = 0; i < 10; ++i) {
         bthread_t th;
         EXPECT_EQ(0, bthread_start_background(&th, NULL, bthread_trace, NULL));
         while (!g_bthread_trace_start) {
             bthread_usleep(1000 * 10);
         }
+        LOG(INFO) << "start bthread = " << th;
         ClosureChecker done;
         brpc::Controller cntl;
         std::string id_string;
@@ -899,16 +936,20 @@ TEST_F(BuiltinServiceTest, bthreads) {
         cntl.http_request().uri().SetHttpURL("/bthreads/" + id_string);
         cntl.http_request()._unresolved_path = id_string;
         service.default_method(&cntl, &req, &res, &done);
-        g_bthread_trace_stop = true;
         EXPECT_FALSE(cntl.Failed());
         const std::string& content = cntl.response_attachment().to_string();
         ok = content.find("stop=0") != std::string::npos &&
              content.find("bthread_trace") != std::string::npos;
-        if (ok) {
+        check_all_ok = check_all_bthreads(th, true) && check_all_bthreads(th, 
false);
+        g_bthread_trace_stop = true;
+        bthread_join(th, NULL);
+        // the `bthread_trace` bthread should not be queried now
+        EXPECT_TRUE(!check_all_bthreads(th, true) && !check_all_bthreads(th, 
false));
+        if (ok && check_all_ok) {
             break;
         }
     }
-    ASSERT_TRUE(ok);
+    ASSERT_TRUE(ok && check_all_ok);
 #endif // BRPC_BTHREAD_TRACER
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to