This is an automated email from the ASF dual-hosted git repository.
guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 5f1d8939 Add support for checking all living bthreads (#3096)
5f1d8939 is described below
commit 5f1d893982a13c4c913ded79086ed2f781891459
Author: Regal <[email protected]>
AuthorDate: Wed Oct 22 20:13:30 2025 +0800
Add support for checking all living bthreads (#3096)
User can check all living bthreads by `curl ip:port/bthreads/all`
or when BRPC_BTHREAD_TRACER is enabled by `curl ip:port/bthreads/all?st=1`
to show bthread stack trace.
This is an enhancement of the original /bthreads service which
provides a method to check a specified bthread by designated
bthread id, as user has no idea what the bthread id is.
BTW, fix _enable_priority_queue not initialized bug and fix task status
incorrectly set to TASK_STATUS_FIRST_READY bug.
---
src/brpc/builtin/bthreads_service.cpp | 32 ++++++++++++---------
src/brpc/builtin/index_service.cpp | 2 +-
src/bthread/bthread.cpp | 20 +++++++++++++
src/bthread/task_control.cpp | 20 +++++++++++++
src/bthread/task_control.h | 1 +
src/bthread/task_group.cpp | 24 +++++++++++-----
src/bthread/task_group.h | 2 +-
src/bthread/task_group_inl.h | 4 +--
src/bthread/task_tracer.cpp | 2 +-
src/butil/resource_pool.h | 5 ++++
src/butil/resource_pool_inl.h | 22 +++++++++++++++
test/brpc_builtin_service_unittest.cpp | 51 ++++++++++++++++++++++++++++++----
12 files changed, 155 insertions(+), 30 deletions(-)
diff --git a/src/brpc/builtin/bthreads_service.cpp
b/src/brpc/builtin/bthreads_service.cpp
index 7676f656..fca86bc6 100644
--- a/src/brpc/builtin/bthreads_service.cpp
+++ b/src/brpc/builtin/bthreads_service.cpp
@@ -23,7 +23,9 @@
#include "brpc/builtin/bthreads_service.h"
namespace bthread {
-extern void print_task(std::ostream& os, bthread_t tid);
+extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
+ bool ignore_not_matched = false);
+extern void print_living_tasks(std::ostream& os, bool enable_trace);
}
@@ -38,30 +40,34 @@ void
BthreadsService::default_method(::google::protobuf::RpcController* cntl_bas
cntl->http_response().set_content_type("text/plain");
butil::IOBufBuilder os;
const std::string& constraint = cntl->http_request().unresolved_path();
-
if (constraint.empty()) {
#ifdef BRPC_BTHREAD_TRACER
- os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for
stack trace";
+ os << "Use /bthreads/<bthread_id> or /bthreads/<bthread_id>?st=1 for
stack trace\n";
+ os << "To check all living bthread, use /bthreads/all or
/bthreads/all?st=1 for stack trace\n";
#else
- os << "Use /bthreads/<bthread_id>";
+ os << "Use /bthreads/<bthread_id>\n";
+ os << "To check all living bthread, use /bthreads/all\n";
#endif // BRPC_BTHREAD_TRACER
} else {
- char* endptr = NULL;
- bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
- if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
- ::bthread::print_task(os, tid);
-
+ bool enable_trace = false;
#ifdef BRPC_BTHREAD_TRACER
const std::string* st = cntl->http_request().uri().GetQuery("st");
if (NULL != st && *st == "1") {
- os << "\nbthread call stack:\n";
- ::bthread::stack_trace(os, tid);
+ enable_trace = true;
}
#endif // BRPC_BTHREAD_TRACER
- } else {
- cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id",
+ char* endptr = NULL;
+ bthread_t tid = strtoull(constraint.c_str(), &endptr, 10);
+ if (*endptr == '\0' || *endptr == '/' || *endptr == '?') {
+ ::bthread::print_task(os, tid, enable_trace);
+ }
+ else if (constraint != "all" && constraint != "all?st=1") {
+ cntl->SetFailed(ENOMETHOD, "path=%s is not a bthread id or all, or
all?st=1\n",
constraint.c_str());
+ } else {
+ ::bthread::print_living_tasks(os, enable_trace);
}
+
}
os.move_to(cntl->response_attachment());
}
diff --git a/src/brpc/builtin/index_service.cpp
b/src/brpc/builtin/index_service.cpp
index 62cb87c1..3b1aa3bc 100644
--- a/src/brpc/builtin/index_service.cpp
+++ b/src/brpc/builtin/index_service.cpp
@@ -158,7 +158,7 @@ void
IndexService::default_method(::google::protobuf::RpcController* controller,
<< Path("/health", html_addr) << " : Test healthy" << NL
<< Path("/vlog", html_addr) << " : List all VLOG callsites" << NL
<< Path("/sockets", html_addr) << " : Check status of a Socket" << NL
- << Path("/bthreads", html_addr) << " : Check status of a bthread" << NL
+ << Path("/bthreads", html_addr) << " : Check status of a bthread or all
living bthreads" << NL
<< Path("/ids", html_addr) << " : Check status of a bthread_id" << NL
<< Path("/protobufs", html_addr) << " : List all protobuf services and
messages" << NL
<< Path("/list", html_addr) << " : json signature of methods" << NL
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index a5f178e3..085d814d 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -32,6 +32,8 @@
#include "bthread/bthread.h"
namespace bthread {
+extern void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
+ bool ignore_not_matched = false);
static bool validate_bthread_concurrency(const char*, int32_t val) {
// bthread_setconcurrency sets the flag on success path which should
@@ -192,8 +194,26 @@ std::string stack_trace(bthread_t tid) {
}
return c->stack_trace(tid);
}
+
#endif // BRPC_BTHREAD_TRACER
+// Print all living (started and not finished) bthreads
+void print_living_tasks(std::ostream& os, bool enable_trace) {
+ TaskControl* c = get_task_control();
+ if (NULL == c) {
+ os << "TaskControl has not been created";
+ return;
+ }
+ auto tids = c->get_living_bthreads();
+ if (tids.empty()) {
+ os << "No living bthreads\n";
+ return;
+ }
+ for (auto tid : tids) {
+ print_task(os, tid, enable_trace, true);
+ }
+}
+
static int add_workers_for_each_tag(int num) {
int added = 0;
auto c = get_task_control();
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 0b34955b..feeeb7d4 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -192,6 +192,7 @@ TaskControl::TaskControl()
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
+ , _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
, _priority_queues(FLAGS_task_group_ntags)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
@@ -587,4 +588,23 @@ bvar::LatencyRecorder*
TaskControl::create_exposed_pending_time() {
return pt;
}
+std::vector<bthread_t> TaskControl::get_living_bthreads() {
+ std::vector<bthread_t> living_bthread_ids;
+ living_bthread_ids.reserve(1024);
+ butil::for_each_resource<TaskMeta>([&living_bthread_ids](TaskMeta* m) {
+ // filter out those bthreads created by bthread_start* functions,
+ // i.e. not those created internally to run main task as they are
+ // opaque to user.
+ if (m && m->fn) {
+ // determine whether the bthread is living by checking version
+ const uint32_t given_ver = get_version(m->tid);
+ BAIDU_SCOPED_LOCK(m->version_lock);
+ if (given_ver == *m->version_butex) {
+ living_bthread_ids.push_back(m->tid);
+ }
+ }
+ });
+ return living_bthread_ids;
+}
+
} // namespace bthread
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index 4d666025..439c96db 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -101,6 +101,7 @@ public:
_priority_queues[tag].push(tid);
}
+ std::vector<bthread_t> get_living_bthreads();
private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 67f029a0..40daaa1a 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -36,6 +36,7 @@
#include "bthread/task_control.h"
#include "bthread/task_group.h"
#include "bthread/timer_thread.h"
+#include "bthread/bthread.h"
#ifdef __x86_64__
#include <x86intrin.h>
@@ -678,7 +679,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
}
}
}
- sched_to(pg, next_meta, true);
+ sched_to(pg, next_meta);
}
void TaskGroup::sched(TaskGroup** pg) {
@@ -699,7 +700,7 @@ void TaskGroup::sched(TaskGroup** pg) {
extern void CheckBthreadScheSafety();
-void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending)
{
+void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
TaskGroup* g = *pg;
#ifndef NDEBUG
if ((++g->_sched_recursive_guard) > 1) {
@@ -1088,10 +1089,11 @@ void TaskGroup::yield(TaskGroup** pg) {
sched(pg);
}
-void print_task(std::ostream& os, bthread_t tid) {
+void print_task(std::ostream& os, bthread_t tid, bool enable_trace,
+ bool ignore_not_matched = false) {
TaskMeta* const m = TaskGroup::address_meta(tid);
if (m == NULL) {
- os << "bthread=" << tid << " : never existed";
+ os << "bthread=" << tid << " : never existed\n";
return;
}
const uint32_t given_ver = get_version(tid);
@@ -1127,7 +1129,9 @@ void print_task(std::ostream& os, bthread_t tid) {
}
}
if (!matched) {
- os << "bthread=" << tid << " : not exist now";
+ if (!ignore_not_matched) {
+ os << "bthread=" << tid << " : not exist now\n";
+ }
} else {
os << "bthread=" << tid << " :\nstop=" << stop
<< "\ninterrupted=" << interrupted
@@ -1136,6 +1140,7 @@ void print_task(std::ostream& os, bthread_t tid) {
<< "\narg=" << (void*)arg
<< "\nattr={stack_type=" << attr.stack_type
<< " flags=" << attr.flags
+ << " specified_tag=" << attr.tag
<< " keytable_pool=" << attr.keytable_pool
<< "}\nhas_tls=" << has_tls
<< "\nuptime_ns=" << butil::cpuwide_time_ns() - cpuwide_start_ns
@@ -1145,8 +1150,13 @@ void print_task(std::ostream& os, bthread_t tid) {
<< "\nstatus=" << status
<< "\ntraced=" << traced
<< "\nworker_tid=" << worker_tid;
-#else
- ;
+ if (enable_trace) {
+ os << "\nbthread call stack:\n";
+ stack_trace(os, tid);
+ }
+ os << "\n\n";
+ #else
+ << "\n\n";
(void)status;(void)traced;(void)worker_tid;
#endif // BRPC_BTHREAD_TRACER
}
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index 958f81d7..f3e1d402 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -111,7 +111,7 @@ public:
// Suspend caller and run bthread `next_tid' in TaskGroup *pg.
// Purpose of this function is to avoid pushing `next_tid' to _rq and
// then being popped by sched(pg), which is not necessary.
- static void sched_to(TaskGroup** pg, TaskMeta* next_meta, bool cur_ending);
+ static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, TaskMeta* next_meta);
diff --git a/src/bthread/task_group_inl.h b/src/bthread/task_group_inl.h
index aec1a284..faa5683b 100644
--- a/src/bthread/task_group_inl.h
+++ b/src/bthread/task_group_inl.h
@@ -56,7 +56,7 @@ inline void TaskGroup::exchange(TaskGroup** pg, TaskMeta*
next_meta) {
? ready_to_run_in_worker_ignoresignal
: ready_to_run_in_worker),
&args);
- TaskGroup::sched_to(pg, next_meta, false);
+ TaskGroup::sched_to(pg, next_meta);
}
inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
@@ -79,7 +79,7 @@ inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t
next_tid) {
}
}
// Update now_ns only when wait_task did yield.
- sched_to(pg, next_meta, false);
+ sched_to(pg, next_meta);
}
inline void TaskGroup::push_rq(bthread_t tid) {
diff --git a/src/bthread/task_tracer.cpp b/src/bthread/task_tracer.cpp
index 2602fe31..f9d14f02 100644
--- a/src/bthread/task_tracer.cpp
+++ b/src/bthread/task_tracer.cpp
@@ -147,7 +147,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
tracing = m->traced;
// bthread is scheduled for the first time.
- if (TASK_STATUS_READY == s || NULL == m->stack) {
+ if (TASK_STATUS_READY == s && NULL == m->stack) {
m->status = TASK_STATUS_FIRST_READY;
} else {
m->status = s;
diff --git a/src/butil/resource_pool.h b/src/butil/resource_pool.h
index f5667938..e1c09faf 100644
--- a/src/butil/resource_pool.h
+++ b/src/butil/resource_pool.h
@@ -132,6 +132,11 @@ template <typename T> ResourcePoolInfo
describe_resources() {
return ResourcePool<T>::singleton()->describe_resources();
}
+// Traverse all allocated resources typed T and apply specified operation
+template <typename T, typename F>
+void for_each_resource(F const& f) {
+ ResourcePool<T>::singleton()->for_each_resource(f);
+}
} // namespace butil
#endif // BUTIL_RESOURCE_POOL_H
diff --git a/src/butil/resource_pool_inl.h b/src/butil/resource_pool_inl.h
index 3274e9b7..82649103 100644
--- a/src/butil/resource_pool_inl.h
+++ b/src/butil/resource_pool_inl.h
@@ -307,6 +307,28 @@ public:
const size_t n = ResourcePoolFreeChunkMaxItem<T>::value();
return n < FREE_CHUNK_NITEM ? n : FREE_CHUNK_NITEM;
}
+
+ template <typename F>
+ void for_each_resource(F const& f) {
+ for (size_t i = 0; i < _ngroup.load(butil::memory_order_acquire); ++i)
{
+ BlockGroup* bg =
_block_groups[i].load(butil::memory_order_consume);
+ if (NULL == bg) {
+ break;
+ }
+ size_t nblock =
std::min(bg->nblock.load(butil::memory_order_relaxed),
+ RP_GROUP_NBLOCK);
+ for (size_t j = 0; j < nblock; ++j) {
+ Block* b = bg->blocks[j].load(butil::memory_order_consume);
+ if (NULL != b) {
+ for (size_t k = 0; k < b->nitem; ++k) {
+ auto item = b->items + k;
+ T* obj = (T*)item->void_data();
+ f(obj);
+ }
+ }
+ }
+ }
+ }
// Number of all allocated objects, including being used and free.
ResourcePoolInfo describe_resources() const {
diff --git a/test/brpc_builtin_service_unittest.cpp
b/test/brpc_builtin_service_unittest.cpp
index 76b99811..4ba8a039 100644
--- a/test/brpc_builtin_service_unittest.cpp
+++ b/test/brpc_builtin_service_unittest.cpp
@@ -852,6 +852,34 @@ void* bthread_trace(void*) {
}
#endif // BRPC_BTHREAD_TRACER
+// check all living bthreads without need to specify bthread id
+bool check_all_bthreads(bthread_t expected_th, bool enable_trace) {
+ brpc::BthreadsService service;
+ brpc::BthreadsRequest req;
+ brpc::BthreadsResponse res;
+ ClosureChecker done;
+ brpc::Controller cntl;
+ std::string all_string("all");
+ if (enable_trace) {
+ all_string.append("?st=1");
+ }
+ cntl.http_request()._unresolved_path = all_string;
+ cntl.http_request().uri().SetHttpURL("/bthreads/" + all_string);
+ service.default_method(&cntl, &req, &res, &done);
+ EXPECT_FALSE(cntl.Failed());
+ const std::string& content = cntl.response_attachment().to_string();
+ std::string expected_str = butil::string_printf("bthread=%llu",
+ (unsigned long long)expected_th);
+ bool ok = content.find("stop=0") != std::string::npos &&
+ content.find(expected_str) != std::string::npos;
+ if (ok && enable_trace) {
+ ok = content.find("bthread_trace") != std::string::npos;
+ } else if (ok && !enable_trace) {
+ ok = content.find("bthread_trace") == std::string::npos;
+ }
+ return ok;
+}
+
TEST_F(BuiltinServiceTest, bthreads) {
brpc::BthreadsService service;
brpc::BthreadsRequest req;
@@ -869,8 +897,16 @@ TEST_F(BuiltinServiceTest, bthreads) {
cntl.http_request()._unresolved_path = "not_valid";
service.default_method(&cntl, &req, &res, &done);
EXPECT_TRUE(cntl.Failed());
- CheckErrorText(cntl, "is not a bthread id");
+ CheckErrorText(cntl, "is not a bthread id or all");
}
+ {
+ ClosureChecker done;
+ brpc::Controller cntl;
+ cntl.http_request()._unresolved_path = "all";
+ service.default_method(&cntl, &req, &res, &done);
+ EXPECT_FALSE(cntl.Failed());
+ CheckContent(cntl, "stop=0");
+ }
{
bthread_t th;
EXPECT_EQ(0, bthread_start_background(&th, NULL, dummy_bthread, NULL));
@@ -885,13 +921,14 @@ TEST_F(BuiltinServiceTest, bthreads) {
}
#ifdef BRPC_BTHREAD_TRACER
- bool ok = false;
+ bool ok = false, check_all_ok = false;
for (int i = 0; i < 10; ++i) {
bthread_t th;
EXPECT_EQ(0, bthread_start_background(&th, NULL, bthread_trace, NULL));
while (!g_bthread_trace_start) {
bthread_usleep(1000 * 10);
}
+ LOG(INFO) << "start bthread = " << th;
ClosureChecker done;
brpc::Controller cntl;
std::string id_string;
@@ -899,16 +936,20 @@ TEST_F(BuiltinServiceTest, bthreads) {
cntl.http_request().uri().SetHttpURL("/bthreads/" + id_string);
cntl.http_request()._unresolved_path = id_string;
service.default_method(&cntl, &req, &res, &done);
- g_bthread_trace_stop = true;
EXPECT_FALSE(cntl.Failed());
const std::string& content = cntl.response_attachment().to_string();
ok = content.find("stop=0") != std::string::npos &&
content.find("bthread_trace") != std::string::npos;
- if (ok) {
+ check_all_ok = check_all_bthreads(th, true) && check_all_bthreads(th,
false);
+ g_bthread_trace_stop = true;
+ bthread_join(th, NULL);
+ // the `bthread_trace` bthread should not be queried now
+ EXPECT_TRUE(!check_all_bthreads(th, true) && !check_all_bthreads(th,
false));
+ if (ok && check_all_ok) {
break;
}
}
- ASSERT_TRUE(ok);
+ ASSERT_TRUE(ok && check_all_ok);
#endif // BRPC_BTHREAD_TRACER
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]