This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 02703638 Fix thread safety of AgentCombiner (#2949) 02703638 is described below commit 02703638aa9ac68b68350d025afc10e0b20d8371 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Thu Apr 17 20:12:57 2025 +0800 Fix thread safety of AgentCombiner (#2949) --- src/bvar/detail/combiner.h | 58 ++++++++++++++++++++++++------------------ src/bvar/detail/percentile.cpp | 8 +++--- src/bvar/detail/percentile.h | 5 ++-- src/bvar/recorder.h | 21 +++++++-------- src/bvar/reducer.h | 32 ++++++++++------------- 5 files changed, 64 insertions(+), 60 deletions(-) diff --git a/src/bvar/detail/combiner.h b/src/bvar/detail/combiner.h index 6a6ab803..07baa891 100644 --- a/src/bvar/detail/combiner.h +++ b/src/bvar/detail/combiner.h @@ -22,6 +22,7 @@ #include <string> // std::string #include <vector> // std::vector +#include <memory> #include "butil/atomicops.h" // butil::atomic #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK #include "butil/type_traits.h" // butil::add_cr_non_integral @@ -42,7 +43,6 @@ public: typedef typename Combiner::Agent agent_type; GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {} - ~GlobalValue() {} // Call this method to unlock tls element and lock the combiner. // Unlocking tls element avoids potential deadlock with @@ -153,24 +153,26 @@ private: }; template <typename ResultTp, typename ElementTp, typename BinaryOp> -class AgentCombiner { +class AgentCombiner + : public std::enable_shared_from_this<AgentCombiner<ResultTp, ElementTp, BinaryOp>> { + public: typedef ResultTp result_type; typedef ElementTp element_type; typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type; + typedef std::shared_ptr<self_type> self_shared_type; + typedef std::weak_ptr<self_type> self_weak_type; friend class GlobalValue<self_type>; - - struct Agent : public butil::LinkNode<Agent> { - Agent() : combiner(NULL) {} + struct Agent : public butil::LinkNode<Agent> { ~Agent() { - if (combiner) { - combiner->commit_and_erase(this); - combiner = NULL; + self_shared_type c = combiner.lock(); + if (NULL != c) { + c->commit_and_erase(this); } } - void reset(const ElementTp& val, self_type* c) { + void reset(const ElementTp& val, self_shared_type c) { combiner = c; element.store(val); } @@ -203,13 +205,20 @@ friend class GlobalValue<self_type>; // // NOTE: Only available to non-atomic types. template <typename Op> - void merge_global(const Op &op) { - GlobalValue<self_type> g(this, combiner); - element.merge_global(op, g); + void merge_global(const Op &op, self_shared_type c = NULL) { + if (NULL == c) { + c = combiner.lock(); + } + if (NULL != c) { + GlobalValue<self_type> g(this, c.get()); + element.merge_global(op, g); + } } - self_type *combiner; ElementContainer<ElementTp> element; + private: + friend class AgentCombiner<ResultTp, ElementTp, BinaryOp>; + self_weak_type combiner; }; typedef detail::AgentGroup<Agent> AgentGroup; @@ -245,10 +254,10 @@ friend class GlobalValue<self_type>; return ret; } - typename butil::add_cr_non_integral<ElementTp>::type element_identity() const - { return _element_identity; } - typename butil::add_cr_non_integral<ResultTp>::type result_identity() const - { return _result_identity; } + typename butil::add_cr_non_integral<ElementTp>::type + element_identity() const { return _element_identity; } + typename butil::add_cr_non_integral<ResultTp>::type + result_identity() const { return _result_identity; } // [Threadsafe] May be called from anywhere. ResultTp reset_all_agents() { @@ -265,7 +274,7 @@ friend class GlobalValue<self_type>; } // Always called from the thread owning the agent. - void commit_and_erase(Agent *agent) { + void commit_and_erase(Agent* agent) { if (NULL == agent) { return; } @@ -279,7 +288,7 @@ friend class GlobalValue<self_type>; } // Always called from the thread owning the agent - void commit_and_clear(Agent *agent) { + void commit_and_clear(Agent* agent) { if (NULL == agent) { return; } @@ -290,7 +299,7 @@ friend class GlobalValue<self_type>; } // We need this function to be as fast as possible. - inline Agent* get_or_create_tls_agent() { + Agent* get_or_create_tls_agent() { Agent* agent = AgentGroup::get_tls_agent(_id); if (!agent) { // Create the agent @@ -300,10 +309,10 @@ friend class GlobalValue<self_type>; return NULL; } } - if (agent->combiner) { + if (!agent->combiner.expired()) { return agent; } - agent->reset(_element_identity, this); + agent->reset(_element_identity, this->shared_from_this()); // TODO: Is uniqueness-checking necessary here? { butil::AutoLock guard(_lock); @@ -314,11 +323,10 @@ friend class GlobalValue<self_type>; void clear_all_agents() { butil::AutoLock guard(_lock); - // reseting agents is must because the agent object may be reused. + // Resting agents is must because the agent object may be reused. // Set element to be default-constructed so that if it's non-pod, // internal allocations should be released. - for (butil::LinkNode<Agent>* - node = _agents.head(); node != _agents.end();) { + for (butil::LinkNode<Agent>* node = _agents.head(); node != _agents.end();) { node->value()->reset(ElementTp(), NULL); butil::LinkNode<Agent>* const saved_next = node->next(); node->RemoveFromList(); diff --git a/src/bvar/detail/percentile.cpp b/src/bvar/detail/percentile.cpp index e0412cbe..37181cc3 100644 --- a/src/bvar/detail/percentile.cpp +++ b/src/bvar/detail/percentile.cpp @@ -85,9 +85,8 @@ private: int64_t _latency; }; -Percentile::Percentile() : _combiner(NULL), _sampler(NULL) { - _combiner = new combiner_type; -} +Percentile::Percentile() + : _combiner(std::make_shared<combiner_type>()), _sampler(NULL) {} Percentile::~Percentile() { // Have to destroy sampler first to avoid the race between destruction and @@ -96,7 +95,6 @@ Percentile::~Percentile() { _sampler->destroy(); _sampler = NULL; } - delete _combiner; } Percentile::value_type Percentile::reset() { @@ -126,7 +124,7 @@ Percentile &Percentile::operator<<(int64_t latency) { } return *this; } - agent->merge_global(AddLatency(latency)); + agent->merge_global(AddLatency(latency), _combiner); return *this; } diff --git a/src/bvar/detail/percentile.h b/src/bvar/detail/percentile.h index 5fcc180a..186103b4 100644 --- a/src/bvar/detail/percentile.h +++ b/src/bvar/detail/percentile.h @@ -462,6 +462,7 @@ public: typedef AgentCombiner <GlobalPercentileSamples, ThreadLocalPercentileSamples, AddPercentileSamples> combiner_type; + typedef typename combiner_type::self_shared_type shared_combiner_type; typedef combiner_type::Agent agent_type; Percentile(); ~Percentile(); @@ -494,8 +495,8 @@ public: private: DISALLOW_COPY_AND_ASSIGN(Percentile); - combiner_type* _combiner; - sampler_type* _sampler; + shared_combiner_type _combiner; + sampler_type* _sampler; std::string _debug_name; }; diff --git a/src/bvar/recorder.h b/src/bvar/recorder.h index 9b73a19b..c2c18bd1 100644 --- a/src/bvar/recorder.h +++ b/src/bvar/recorder.h @@ -113,9 +113,10 @@ public: }; typedef detail::AgentCombiner<Stat, uint64_t, AddToStat> combiner_type; + typedef typename combiner_type::self_shared_type shared_combiner_type; typedef combiner_type::Agent agent_type; - IntRecorder() : _sampler(NULL) {} + IntRecorder() : _combiner(std::make_shared<combiner_type>()), _sampler(NULL) {} explicit IntRecorder(const butil::StringPiece& name) : _sampler(NULL) { expose(name); @@ -126,7 +127,7 @@ public: expose_as(prefix, name); } - ~IntRecorder() { + ~IntRecorder() override { hide(); if (_sampler) { _sampler->destroy(); @@ -138,19 +139,19 @@ public: IntRecorder& operator<<(int64_t/*note*/ sample); int64_t average() const { - return _combiner.combine_agents().get_average_int(); + return _combiner->combine_agents().get_average_int(); } double average(double) const { - return _combiner.combine_agents().get_average_double(); + return _combiner->combine_agents().get_average_double(); } Stat get_value() const { - return _combiner.combine_agents(); + return _combiner->combine_agents(); } Stat reset() { - return _combiner.reset_all_agents(); + return _combiner->reset_all_agents(); } AddStat op() const { return AddStat(); } @@ -160,7 +161,7 @@ public: os << get_value(); } - bool valid() const { return _combiner.valid(); } + bool valid() const { return _combiner->valid(); } sampler_type* get_sampler() { if (NULL == _sampler) { @@ -230,7 +231,7 @@ private: } private: - combiner_type _combiner; + shared_combiner_type _combiner; sampler_type* _sampler; std::string _debug_name; }; @@ -258,7 +259,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) { << (void*)this << ") " << reason; } } - agent_type* agent = _combiner.get_or_create_tls_agent(); + agent_type* agent = _combiner->get_or_create_tls_agent(); if (BAIDU_UNLIKELY(!agent)) { LOG(FATAL) << "Fail to create agent"; return *this; @@ -276,7 +277,7 @@ inline IntRecorder& IntRecorder::operator<<(int64_t sample) { // Although agent->element might have been cleared at this // point, it is just OK because the very value is 0 in // this case - agent->combiner->commit_and_clear(agent); + _combiner->commit_and_clear(agent); sum = 0; num = 0; n = 0; diff --git a/src/bvar/reducer.h b/src/bvar/reducer.h index fbd4fa78..ccf78054 100644 --- a/src/bvar/reducer.h +++ b/src/bvar/reducer.h @@ -69,13 +69,13 @@ template <typename T, typename Op, typename InvOp = detail::VoidOp> class Reducer : public Variable { public: typedef typename detail::AgentCombiner<T, T, Op> combiner_type; + typedef typename combiner_type::self_shared_type shared_combiner_type; typedef typename combiner_type::Agent agent_type; typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type; class SeriesSampler : public detail::Sampler { public: SeriesSampler(Reducer* owner, const Op& op) : _owner(owner), _series(op) {} - ~SeriesSampler() {} void take_sample() override { _series.append(_owner->get_value()); } void describe(std::ostream& os) { _series.describe(os, NULL); } private: @@ -85,16 +85,12 @@ public: public: // The `identify' must satisfy: identity Op a == a - Reducer(typename butil::add_cr_non_integral<T>::type identity = T(), - const Op& op = Op(), - const InvOp& inv_op = InvOp()) - : _combiner(identity, identity, op) - , _sampler(NULL) - , _series_sampler(NULL) - , _inv_op(inv_op) { - } + explicit Reducer(typename butil::add_cr_non_integral<T>::type identity = T(), + const Op& op = Op(), const InvOp& inv_op = InvOp()) + : _combiner(std::make_shared<combiner_type>(identity, identity, op)) + , _sampler(NULL) , _series_sampler(NULL) , _inv_op(inv_op) {} - ~Reducer() { + ~Reducer() override { // Calling hide() manually is a MUST required by Variable. hide(); if (_sampler) { @@ -119,13 +115,13 @@ public: << "You should not call Reducer<" << butil::class_name_str<T>() << ", " << butil::class_name_str<Op>() << ">::get_value() when a" << " Window<> is used because the operator does not have inverse."; - return _combiner.combine_agents(); + return _combiner->combine_agents(); } // Reset the reduced value to T(). // Returns the reduced value before reset. - T reset() { return _combiner.reset_all_agents(); } + T reset() { return _combiner->reset_all_agents(); } void describe(std::ostream& os, bool quote_string) const override { if (butil::is_same<T, std::string>::value && quote_string) { @@ -140,10 +136,10 @@ public: #endif // True if this reducer is constructed successfully. - bool valid() const { return _combiner.valid(); } + bool valid() const { return _combiner->valid(); } // Get instance of Op. - const Op& op() const { return _combiner.op(); } + const Op& op() const { return _combiner->op(); } const InvOp& inv_op() const { return _inv_op; } sampler_type* get_sampler() { @@ -174,14 +170,14 @@ protected: !butil::is_same<InvOp, detail::VoidOp>::value && !butil::is_same<T, std::string>::value && FLAGS_save_series) { - _series_sampler = new SeriesSampler(this, _combiner.op()); + _series_sampler = new SeriesSampler(this, _combiner->op()); _series_sampler->schedule(); } return rc; } private: - combiner_type _combiner; + shared_combiner_type _combiner; sampler_type* _sampler; SeriesSampler* _series_sampler; InvOp _inv_op; @@ -191,12 +187,12 @@ template <typename T, typename Op, typename InvOp> inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<( typename butil::add_cr_non_integral<T>::type value) { // It's wait-free for most time - agent_type* agent = _combiner.get_or_create_tls_agent(); + agent_type* agent = _combiner->get_or_create_tls_agent(); if (__builtin_expect(!agent, 0)) { LOG(FATAL) << "Fail to create agent"; return *this; } - agent->element.modify(_combiner.op(), value); + agent->element.modify(_combiner->op(), value); return *this; } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org