This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 07083331 Bugfix: TaskTracer deadlocks due to ABA problem (#3115)
07083331 is described below

commit 07083331e1496746f4b1cfa54d0b6364f321be34
Author: Bright Chen <[email protected]>
AuthorDate: Tue Oct 14 14:06:56 2025 +0800

    Bugfix: TaskTracer deadlocks due to ABA problem (#3115)
---
 src/bthread/task_meta.h                  |  4 ++
 src/bthread/task_tracer.cpp              | 92 ++++++++++++--------------------
 src/bthread/task_tracer.h                |  8 +--
 test/brpc_http_rpc_protocol_unittest.cpp |  4 +-
 4 files changed, 41 insertions(+), 67 deletions(-)

diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h
index a4ed42bf..1b77c0b6 100644
--- a/src/bthread/task_meta.h
+++ b/src/bthread/task_meta.h
@@ -112,6 +112,8 @@ struct TaskMeta {
     TaskStatus status{TASK_STATUS_UNKNOWN};
     // Whether bthread is traced?
     bool traced{false};
+    // [Not Reset] guarantee tracing completion before jumping.
+    pthread_mutex_t trace_lock{};
     // Worker thread id.
     pthread_t worker_tid{};
 
@@ -122,9 +124,11 @@ public:
         pthread_spin_init(&version_lock, 0);
         version_butex = butex_create_checked<uint32_t>();
         *version_butex = 1;
+        pthread_mutex_init(&trace_lock, NULL);
     }
         
     ~TaskMeta() {
+        pthread_mutex_destroy(&trace_lock);
         butex_destroy(version_butex);
         version_butex = NULL;
         pthread_spin_destroy(&version_lock);
diff --git a/src/bthread/task_tracer.cpp b/src/bthread/task_tracer.cpp
index 415670aa..2602fe31 100644
--- a/src/bthread/task_tracer.cpp
+++ b/src/bthread/task_tracer.cpp
@@ -119,6 +119,7 @@ void TaskTracer::Result::OutputToStream(std::ostream& os) 
const {
 
 bool TaskTracer::Init() {
     if (_trace_time.expose("bthread_trace_time") != 0) {
+        LOG(ERROR) << "Fail to expose bthread_trace_time";
         return false;
     }
     if (!RegisterSignalHandler()) {
@@ -136,7 +137,7 @@ void TaskTracer::set_status(TaskStatus s, TaskMeta* m) {
     CHECK_NE(TASK_STATUS_RUNNING, s) << "Use `set_running_status' instead";
     CHECK_NE(TASK_STATUS_END, s) << "Use `set_end_status_unsafe' instead";
 
-    bool tracing;
+    bool tracing = false;
     {
         BAIDU_SCOPED_LOCK(m->version_lock);
         if (TASK_STATUS_UNKNOWN == m->status && TASK_STATUS_JUMPING == s) {
@@ -182,31 +183,8 @@ void TaskTracer::Trace(std::ostream& os, bthread_t tid) {
 }
 
 void TaskTracer::WaitForTracing(TaskMeta* m) {
-    BAIDU_SCOPED_LOCK(_mutex);
-    while (m->traced) {
-        _cond.Wait();
-    }
-}
-
-TaskStatus TaskTracer::WaitForJumping(TaskMeta* m) {
-    // Reasons for not using locks here:
-    // 1. It is necessary to lock before jump_stack, unlock after jump_stack,
-    //    which involves two different bthread and is prone to errors.
-    // 2. jump_stack is fast.
-    int i = 0;
-    do {
-        // The bthread is jumping now, spin until it finishes.
-        if (i++ < 30) {
-            cpu_relax();
-        } else {
-            sched_yield();
-        }
-
-        BAIDU_SCOPED_LOCK(m->version_lock);
-        if (TASK_STATUS_JUMPING != m->status) {
-            return m->status;
-        }
-    } while (true);
+    BAIDU_SCOPED_LOCK(m->trace_lock);
+    // Acquiring trace_lock means tracing is done.
 }
 
 TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
@@ -224,25 +202,44 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
     // Make sure only one bthread is traced at a time.
     BAIDU_SCOPED_LOCK(_trace_request_mutex);
 
+    // The chance to remove unused SignalSyncs.
+    auto iter = std::remove_if(
+        _inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
+        [](butil::intrusive_ptr<SignalSync>& sync) {
+            return sync->ref_count() == 1;
+    });
+    _inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());
+
     TaskMeta* m = TaskGroup::address_meta(tid);
     if (NULL == m) {
         return Result::MakeErrorResult("bthread=%d never existed", tid);
     }
 
-    BAIDU_SCOPED_LOCK(_mutex);
+    BAIDU_SCOPED_LOCK(m->trace_lock);
     TaskStatus status;
     pthread_t worker_tid;
     const uint32_t given_version = get_version(tid);
     {
         BAIDU_SCOPED_LOCK(m->version_lock);
-        if (given_version == *m->version_butex) {
-            // Start tracing.
-            m->traced = true;
-            worker_tid = m->worker_tid;
-            status = m->status;
-        } else {
+        if (given_version != *m->version_butex) {
             return Result::MakeErrorResult("bthread=%d not exist now", tid);
         }
+
+        status = m->status;
+        if (TASK_STATUS_UNKNOWN == status) {
+            return Result::MakeErrorResult("bthread=%d not exist now", tid);
+        } else if (TASK_STATUS_CREATED == status) {
+            return Result::MakeErrorResult("bthread=%d has just been created", 
tid);
+        } else if (TASK_STATUS_FIRST_READY == status) {
+            return Result::MakeErrorResult("bthread=%d is scheduled for the 
first time", tid);
+        } else if (TASK_STATUS_END == status) {
+            return Result::MakeErrorResult("bthread=%d has ended", tid);
+        } else if (TASK_STATUS_JUMPING == status) {
+            return Result::MakeErrorResult("bthread=%d is jumping stack", tid);
+        }
+        // Start tracing.
+        m->traced = true;
+        worker_tid = m->worker_tid;
     }
 
     BRPC_SCOPE_EXIT {
@@ -252,31 +249,16 @@ TaskTracer::Result TaskTracer::TraceImpl(bthread_t tid) {
             // tracing completion, so given_version != *m->version_butex is OK.
             m->traced = false;
         }
-        // Wake up the waiting worker thread to jump.
-        _cond.Signal();
     };
 
-    if (TASK_STATUS_UNKNOWN == status) {
-        return Result::MakeErrorResult("bthread=%d not exist now", tid);
-    } else if (TASK_STATUS_CREATED == status) {
-        return Result::MakeErrorResult("bthread=%d has just been created", 
tid);
-    } else if (TASK_STATUS_FIRST_READY == status) {
-        return Result::MakeErrorResult("bthread=%d is scheduled for the first 
time", tid);
-    } else if (TASK_STATUS_END == status) {
-        return Result::MakeErrorResult("bthread=%d has ended", tid);
-    } else if (TASK_STATUS_JUMPING == status) {
-        // Wait for jumping completion.
-        status = WaitForJumping(m);
-    }
-
-    // After jumping, the status may be RUNNING, SUSPENDED, or READY, which is 
traceable.
+    // The status may be RUNNING, SUSPENDED, or READY, which is traceable.
     if (TASK_STATUS_RUNNING == status) {
         return SignalTrace(worker_tid);
     } else if (TASK_STATUS_SUSPENDED == status || TASK_STATUS_READY == status) 
{
         return ContextTrace(m->stack->context);
     }
 
-    return Result::MakeErrorResult("Invalid TaskStatus=%d", status);
+    return Result::MakeErrorResult("Invalid TaskStatus=%d of bthread=%d", 
status, tid);
 }
 
 // Instruct ASan to ignore this function.
@@ -408,14 +390,6 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t 
worker_tid) {
         return Result::MakeErrorResult("Forbid to trace self");
     }
 
-    // Remove unused SignalSyncs.
-    auto iter = std::remove_if(
-        _inuse_signal_syncs.begin(), _inuse_signal_syncs.end(),
-        [](butil::intrusive_ptr<SignalSync>& sync) {
-            return sync->ref_count() == 1;
-    });
-    _inuse_signal_syncs.erase(iter, _inuse_signal_syncs.end());
-
     // Each signal trace has an independent SignalSync to
     // prevent the previous SignalHandler from affecting the new SignalTrace.
     butil::intrusive_ptr<SignalSync> signal_sync(new SignalSync());
@@ -465,6 +439,8 @@ TaskTracer::Result TaskTracer::SignalTrace(pthread_t 
worker_tid) {
         }
         break;
     }
+    // Remove the successful SignalSync.
+    _inuse_signal_syncs.pop_back();
     
     return signal_sync->result;
 }
diff --git a/src/bthread/task_tracer.h b/src/bthread/task_tracer.h
index 0888c658..be95f3ac 100644
--- a/src/bthread/task_tracer.h
+++ b/src/bthread/task_tracer.h
@@ -47,7 +47,7 @@ public:
     void Trace(std::ostream& os, bthread_t tid);
 
     // When the worker is jumping stack from a bthread to another,
-    void WaitForTracing(TaskMeta* m);
+    static void WaitForTracing(TaskMeta* m);
 
 private:
     // Error number guard used in signal handler.
@@ -94,7 +94,6 @@ private:
         Result result;
     };
 
-    static TaskStatus WaitForJumping(TaskMeta* m);
     Result TraceImpl(bthread_t tid);
 
     unw_cursor_t MakeCursor(bthread_fcontext_t fcontext);
@@ -108,11 +107,6 @@ private:
     // Make sure only one bthread is traced at a time.
     Mutex _trace_request_mutex;
 
-    // For signal trace.
-    // Make sure bthread does not jump stack when it is being traced.
-    butil::Mutex _mutex;
-    butil::ConditionVariable _cond{&_mutex};
-
     // For context trace.
     unw_context_t _context{};
 
diff --git a/test/brpc_http_rpc_protocol_unittest.cpp 
b/test/brpc_http_rpc_protocol_unittest.cpp
index 5a6839c8..f13c6877 100644
--- a/test/brpc_http_rpc_protocol_unittest.cpp
+++ b/test/brpc_http_rpc_protocol_unittest.cpp
@@ -1701,9 +1701,9 @@ TEST_F(HttpTest, spring_protobuf_content_type) {
     res.Clear();
     cntl2.http_request().set_content_type("application/x-protobuf");
     stub.Echo(&cntl2, &req, &res, nullptr);
-    ASSERT_FALSE(cntl.Failed());
+    ASSERT_FALSE(cntl2.Failed());
     ASSERT_EQ(EXP_RESPONSE, res.message());
-    ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
+    ASSERT_EQ("application/x-protobuf", cntl2.http_response().content_type());
 }
 
 TEST_F(HttpTest, dump_http_request) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to