This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 02703638 Fix thread safety of AgentCombiner (#2949)
02703638 is described below

commit 02703638aa9ac68b68350d025afc10e0b20d8371
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Thu Apr 17 20:12:57 2025 +0800

    Fix thread safety of AgentCombiner (#2949)
---
 src/bvar/detail/combiner.h     | 58 ++++++++++++++++++++++++------------------
 src/bvar/detail/percentile.cpp |  8 +++---
 src/bvar/detail/percentile.h   |  5 ++--
 src/bvar/recorder.h            | 21 +++++++--------
 src/bvar/reducer.h             | 32 ++++++++++-------------
 5 files changed, 64 insertions(+), 60 deletions(-)

diff --git a/src/bvar/detail/combiner.h b/src/bvar/detail/combiner.h
index 6a6ab803..07baa891 100644
--- a/src/bvar/detail/combiner.h
+++ b/src/bvar/detail/combiner.h
@@ -22,6 +22,7 @@
 
 #include <string>                       // std::string
 #include <vector>                       // std::vector
+#include <memory>
 #include "butil/atomicops.h"             // butil::atomic
 #include "butil/scoped_lock.h"           // BAIDU_SCOPED_LOCK
 #include "butil/type_traits.h"           // butil::add_cr_non_integral
@@ -42,7 +43,6 @@ public:
     typedef typename Combiner::Agent agent_type;
 
     GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {}
-    ~GlobalValue() {}
 
     // Call this method to unlock tls element and lock the combiner.
     // Unlocking tls element avoids potential deadlock with
@@ -153,24 +153,26 @@ private:
 };
 
 template <typename ResultTp, typename ElementTp, typename BinaryOp>
-class AgentCombiner {
+class AgentCombiner
+    : public std::enable_shared_from_this<AgentCombiner<ResultTp, ElementTp, 
BinaryOp>> {
+
 public:
     typedef ResultTp result_type;
     typedef ElementTp element_type;
     typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type;
+    typedef std::shared_ptr<self_type> self_shared_type;
+    typedef std::weak_ptr<self_type> self_weak_type;
 friend class GlobalValue<self_type>;
-    
-    struct Agent : public butil::LinkNode<Agent> {
-        Agent() : combiner(NULL) {}
 
+    struct Agent : public butil::LinkNode<Agent> {
         ~Agent() {
-            if (combiner) {
-                combiner->commit_and_erase(this);
-                combiner = NULL;
+            self_shared_type c = combiner.lock();
+            if (NULL != c) {
+                c->commit_and_erase(this);
             }
         }
         
-        void reset(const ElementTp& val, self_type* c) {
+        void reset(const ElementTp& val, self_shared_type c) {
             combiner = c;
             element.store(val);
         }
@@ -203,13 +205,20 @@ friend class GlobalValue<self_type>;
         // 
         // NOTE: Only available to non-atomic types.
         template <typename Op>
-        void merge_global(const Op &op) {
-            GlobalValue<self_type> g(this, combiner);
-            element.merge_global(op, g);
+        void merge_global(const Op &op, self_shared_type c = NULL) {
+            if (NULL == c) {
+                c = combiner.lock();
+            }
+            if (NULL != c) {
+                GlobalValue<self_type> g(this, c.get());
+                element.merge_global(op, g);
+            }
         }
 
-        self_type *combiner;
         ElementContainer<ElementTp> element;
+    private:
+    friend class AgentCombiner<ResultTp, ElementTp, BinaryOp>;
+        self_weak_type combiner;
     };
 
     typedef detail::AgentGroup<Agent> AgentGroup;
@@ -245,10 +254,10 @@ friend class GlobalValue<self_type>;
         return ret;
     }
 
-    typename butil::add_cr_non_integral<ElementTp>::type element_identity() 
const 
-    { return _element_identity; }
-    typename butil::add_cr_non_integral<ResultTp>::type result_identity() 
const 
-    { return _result_identity; }
+    typename butil::add_cr_non_integral<ElementTp>::type
+    element_identity() const { return _element_identity; }
+    typename butil::add_cr_non_integral<ResultTp>::type
+    result_identity() const { return _result_identity; }
 
     // [Threadsafe] May be called from anywhere.
     ResultTp reset_all_agents() {
@@ -265,7 +274,7 @@ friend class GlobalValue<self_type>;
     }
 
     // Always called from the thread owning the agent.
-    void commit_and_erase(Agent *agent) {
+    void commit_and_erase(Agent* agent) {
         if (NULL == agent) {
             return;
         }
@@ -279,7 +288,7 @@ friend class GlobalValue<self_type>;
     }
 
     // Always called from the thread owning the agent
-    void commit_and_clear(Agent *agent) {
+    void commit_and_clear(Agent* agent) {
         if (NULL == agent) {
             return;
         }
@@ -290,7 +299,7 @@ friend class GlobalValue<self_type>;
     }
 
     // We need this function to be as fast as possible.
-    inline Agent* get_or_create_tls_agent() {
+    Agent* get_or_create_tls_agent() {
         Agent* agent = AgentGroup::get_tls_agent(_id);
         if (!agent) {
             // Create the agent
@@ -300,10 +309,10 @@ friend class GlobalValue<self_type>;
                 return NULL;
             }
         }
-        if (agent->combiner) {
+        if (!agent->combiner.expired()) {
             return agent;
         }
-        agent->reset(_element_identity, this);
+        agent->reset(_element_identity, this->shared_from_this());
         // TODO: Is uniqueness-checking necessary here?
         {
             butil::AutoLock guard(_lock);
@@ -314,11 +323,10 @@ friend class GlobalValue<self_type>;
 
     void clear_all_agents() {
         butil::AutoLock guard(_lock);
-        // reseting agents is must because the agent object may be reused.
+        // Resting agents is must because the agent object may be reused.
         // Set element to be default-constructed so that if it's non-pod,
         // internal allocations should be released.
-        for (butil::LinkNode<Agent>* 
-                node = _agents.head(); node != _agents.end();) {
+        for (butil::LinkNode<Agent>* node = _agents.head(); node != 
_agents.end();) {
             node->value()->reset(ElementTp(), NULL);
             butil::LinkNode<Agent>* const saved_next =  node->next();
             node->RemoveFromList();
diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp
index e0412cbe..37181cc3 100644
--- a/src/bvar/detail/percentile.cpp
+++ b/src/bvar/detail/percentile.cpp
@@ -85,9 +85,8 @@ private:
     int64_t _latency;
 };
 
-Percentile::Percentile() : _combiner(NULL), _sampler(NULL) {
-    _combiner = new combiner_type;
-}
+Percentile::Percentile()
+    : _combiner(std::make_shared<combiner_type>()), _sampler(NULL) {}
 
 Percentile::~Percentile() {
     // Have to destroy sampler first to avoid the race between destruction and
@@ -96,7 +95,6 @@ Percentile::~Percentile() {
         _sampler->destroy();
         _sampler = NULL;
     }
-    delete _combiner;
 }
 
 Percentile::value_type Percentile::reset() {
@@ -126,7 +124,7 @@ Percentile &Percentile::operator<<(int64_t latency) {
         }
         return *this;
     }
-    agent->merge_global(AddLatency(latency));
+    agent->merge_global(AddLatency(latency), _combiner);
     return *this;
 }
 
diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h
index 5fcc180a..186103b4 100644
--- a/src/bvar/detail/percentile.h
+++ b/src/bvar/detail/percentile.h
@@ -462,6 +462,7 @@ public:
     typedef AgentCombiner <GlobalPercentileSamples,
                            ThreadLocalPercentileSamples,
                            AddPercentileSamples>            combiner_type;
+    typedef typename combiner_type::self_shared_type        
shared_combiner_type;
     typedef combiner_type::Agent                            agent_type;
     Percentile();
     ~Percentile();
@@ -494,8 +495,8 @@ public:
 private:
     DISALLOW_COPY_AND_ASSIGN(Percentile);
 
-    combiner_type*          _combiner;
-    sampler_type*           _sampler;
+    shared_combiner_type _combiner;
+    sampler_type* _sampler;
     std::string _debug_name;
 };
 
diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h
index 9b73a19b..c2c18bd1 100644
--- a/src/bvar/recorder.h
+++ b/src/bvar/recorder.h
@@ -113,9 +113,10 @@ public:
     };
     
     typedef detail::AgentCombiner<Stat, uint64_t, AddToStat> combiner_type;
+    typedef typename combiner_type::self_shared_type shared_combiner_type;
     typedef combiner_type::Agent agent_type;
 
-    IntRecorder() : _sampler(NULL) {}
+    IntRecorder() : _combiner(std::make_shared<combiner_type>()), 
_sampler(NULL) {}
 
     explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) {
         expose(name);
@@ -126,7 +127,7 @@ public:
         expose_as(prefix, name);
     }
 
-    ~IntRecorder() {
+    ~IntRecorder() override {
         hide();
         if (_sampler) {
             _sampler->destroy();
@@ -138,19 +139,19 @@ public:
     IntRecorder& operator<<(int64_t/*note*/ sample);
 
     int64_t average() const {
-        return _combiner.combine_agents().get_average_int();
+        return _combiner->combine_agents().get_average_int();
     }
 
     double average(double) const {
-        return _combiner.combine_agents().get_average_double();
+        return _combiner->combine_agents().get_average_double();
     }
 
     Stat get_value() const {
-        return _combiner.combine_agents();
+        return _combiner->combine_agents();
     }
     
     Stat reset() {
-        return _combiner.reset_all_agents();
+        return _combiner->reset_all_agents();
     }
 
     AddStat op() const { return AddStat(); }
@@ -160,7 +161,7 @@ public:
         os << get_value();
     }
 
-    bool valid() const { return _combiner.valid(); }
+    bool valid() const { return _combiner->valid(); }
     
     sampler_type* get_sampler() {
         if (NULL == _sampler) {
@@ -230,7 +231,7 @@ private:
     }
 
 private:
-    combiner_type           _combiner;
+    shared_combiner_type    _combiner;
     sampler_type*           _sampler;
     std::string             _debug_name;
 };
@@ -258,7 +259,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) 
{
                        << (void*)this << ") " << reason;
         }
     }
-    agent_type* agent = _combiner.get_or_create_tls_agent();
+    agent_type* agent = _combiner->get_or_create_tls_agent();
     if (BAIDU_UNLIKELY(!agent)) {
         LOG(FATAL) << "Fail to create agent";
         return *this;
@@ -276,7 +277,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) 
{
             // Although agent->element might have been cleared at this 
             // point, it is just OK because the very value is 0 in
             // this case
-            agent->combiner->commit_and_clear(agent);
+            _combiner->commit_and_clear(agent);
             sum = 0;
             num = 0;
             n = 0;
diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h
index fbd4fa78..ccf78054 100644
--- a/src/bvar/reducer.h
+++ b/src/bvar/reducer.h
@@ -69,13 +69,13 @@ template <typename T, typename Op, typename InvOp = 
detail::VoidOp>
 class Reducer : public Variable {
 public:
     typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
+    typedef typename combiner_type::self_shared_type shared_combiner_type;
     typedef typename combiner_type::Agent agent_type;
     typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
     class SeriesSampler : public detail::Sampler {
     public:
         SeriesSampler(Reducer* owner, const Op& op)
             : _owner(owner), _series(op) {}
-        ~SeriesSampler() {}
         void take_sample() override { _series.append(_owner->get_value()); }
         void describe(std::ostream& os) { _series.describe(os, NULL); }
     private:
@@ -85,16 +85,12 @@ public:
 
 public:
     // The `identify' must satisfy: identity Op a == a
-    Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
-            const Op& op = Op(),
-            const InvOp& inv_op = InvOp())
-        : _combiner(identity, identity, op)
-        , _sampler(NULL)
-        , _series_sampler(NULL)
-        , _inv_op(inv_op) {
-    }
+    explicit Reducer(typename butil::add_cr_non_integral<T>::type identity = 
T(),
+                     const Op& op = Op(), const InvOp& inv_op = InvOp())
+        : _combiner(std::make_shared<combiner_type>(identity, identity, op))
+        , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) {}
 
-    ~Reducer() {
+    ~Reducer() override {
         // Calling hide() manually is a MUST required by Variable.
         hide();
         if (_sampler) {
@@ -119,13 +115,13 @@ public:
             << "You should not call Reducer<" << butil::class_name_str<T>()
             << ", " << butil::class_name_str<Op>() << ">::get_value() when a"
             << " Window<> is used because the operator does not have inverse.";
-        return _combiner.combine_agents();
+        return _combiner->combine_agents();
     }
 
 
     // Reset the reduced value to T().
     // Returns the reduced value before reset.
-    T reset() { return _combiner.reset_all_agents(); }
+    T reset() { return _combiner->reset_all_agents(); }
 
     void describe(std::ostream& os, bool quote_string) const override {
         if (butil::is_same<T, std::string>::value && quote_string) {
@@ -140,10 +136,10 @@ public:
 #endif
 
     // True if this reducer is constructed successfully.
-    bool valid() const { return _combiner.valid(); }
+    bool valid() const { return _combiner->valid(); }
 
     // Get instance of Op.
-    const Op& op() const { return _combiner.op(); }
+    const Op& op() const { return _combiner->op(); }
     const InvOp& inv_op() const { return _inv_op; }
     
     sampler_type* get_sampler() {
@@ -174,14 +170,14 @@ protected:
             !butil::is_same<InvOp, detail::VoidOp>::value &&
             !butil::is_same<T, std::string>::value &&
             FLAGS_save_series) {
-            _series_sampler = new SeriesSampler(this, _combiner.op());
+            _series_sampler = new SeriesSampler(this, _combiner->op());
             _series_sampler->schedule();
         }
         return rc;
     }
 
 private:
-    combiner_type   _combiner;
+    shared_combiner_type _combiner;
     sampler_type* _sampler;
     SeriesSampler* _series_sampler;
     InvOp _inv_op;
@@ -191,12 +187,12 @@ template <typename T, typename Op, typename InvOp>
 inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
     typename butil::add_cr_non_integral<T>::type value) {
     // It's wait-free for most time
-    agent_type* agent = _combiner.get_or_create_tls_agent();
+    agent_type* agent = _combiner->get_or_create_tls_agent();
     if (__builtin_expect(!agent, 0)) {
         LOG(FATAL) << "Fail to create agent";
         return *this;
     }
-    agent->element.modify(_combiner.op(), value);
+    agent->element.modify(_combiner->op(), value);
     return *this;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to