github-actions[bot] commented on code in PR #64040:
URL: https://github.com/apache/doris/pull/64040#discussion_r3489079965
##########
thirdparty/patches/brpc-1.4.0-fix-agent-combiner-thread-safety.patch:
##########
@@ -0,0 +1,439 @@
+diff --git a/src/bvar/detail/combiner.h b/src/bvar/detail/combiner.h
+index 6a6ab80..65ed9a1 100644
+--- a/src/bvar/detail/combiner.h
++++ b/src/bvar/detail/combiner.h
+@@ -22,6 +22,7 @@
+
+ #include <string> // std::string
+ #include <vector> // std::vector
++#include <memory>
+ #include "butil/atomicops.h" // butil::atomic
+ #include "butil/scoped_lock.h" // BAIDU_SCOPED_LOCK
+ #include "butil/type_traits.h" // butil::add_cr_non_integral
+@@ -42,7 +43,6 @@ public:
+ typedef typename Combiner::Agent agent_type;
+
+ GlobalValue(agent_type* a, Combiner* c) : _a(a), _c(c) {}
+- ~GlobalValue() {}
+
+ // Call this method to unlock tls element and lock the combiner.
+ // Unlocking tls element avoids potential deadlock with
+@@ -113,7 +113,7 @@ class ElementContainer<
+ T, typename butil::enable_if<is_atomical<T>::value>::type> {
+ public:
+ // We don't need any memory fencing here, every op is relaxed.
+-
++
+ inline void load(T* out) {
+ *out = _value.load(butil::memory_order_relaxed);
+ }
+@@ -153,24 +153,26 @@ private:
+ };
+
+ template <typename ResultTp, typename ElementTp, typename BinaryOp>
+-class AgentCombiner {
++class AgentCombiner
++ : public std::enable_shared_from_this<AgentCombiner<ResultTp, ElementTp,
BinaryOp>> {
++
+ public:
+ typedef ResultTp result_type;
+ typedef ElementTp element_type;
+ typedef AgentCombiner<ResultTp, ElementTp, BinaryOp> self_type;
++ typedef std::shared_ptr<self_type> self_shared_type;
++ typedef std::weak_ptr<self_type> self_weak_type;
+ friend class GlobalValue<self_type>;
+-
+- struct Agent : public butil::LinkNode<Agent> {
+- Agent() : combiner(NULL) {}
+
++ struct Agent : public butil::LinkNode<Agent> {
+ ~Agent() {
+- if (combiner) {
+- combiner->commit_and_erase(this);
+- combiner = NULL;
++ self_shared_type c = combiner.lock();
++ if (NULL != c) {
++ c->commit_and_erase(this);
+ }
+ }
+-
+- void reset(const ElementTp& val, self_type* c) {
++
++ void reset(const ElementTp& val, const self_shared_type& c) {
+ combiner = c;
+ element.store(val);
+ }
+@@ -181,11 +183,11 @@ friend class GlobalValue<self_type>;
+ // void operator()(GlobalValue<Combiner> & global_value,
+ // ElementTp & local_value) const {
+ // if (test_for_merging(local_value)) {
+- //
++ //
+ // // Unlock tls element and lock combiner. Obviously
+ // // tls element can be changed during lock().
+ // ResultTp* g = global_value.lock();
+- //
++ //
+ // // *g and local_value are not changed provided
+ // // merge_global is called from the thread owning
+ // // the agent.
+@@ -200,16 +202,23 @@ friend class GlobalValue<self_type>;
+ // ...
+ // }
+ // };
+- //
++ //
+ // NOTE: Only available to non-atomic types.
+ template <typename Op>
+- void merge_global(const Op &op) {
+- GlobalValue<self_type> g(this, combiner);
+- element.merge_global(op, g);
++ void merge_global(const Op &op, self_shared_type c = NULL) {
++ if (NULL == c) {
++ c = combiner.lock();
++ }
++ if (NULL != c) {
++ GlobalValue<self_type> g(this, c.get());
++ element.merge_global(op, g);
++ }
+ }
+
+- self_type *combiner;
+ ElementContainer<ElementTp> element;
++ private:
++ friend class AgentCombiner<ResultTp, ElementTp, BinaryOp>;
++ self_weak_type combiner;
+ };
+
+ typedef detail::AgentGroup<Agent> AgentGroup;
+@@ -231,7 +240,7 @@ friend class GlobalValue<self_type>;
+ _id = -1;
+ }
+ }
+-
++
+ // [Threadsafe] May be called from anywhere
+ ResultTp combine_agents() const {
+ ElementTp tls_value;
+@@ -245,10 +254,10 @@ friend class GlobalValue<self_type>;
+ return ret;
+ }
+
+- typename butil::add_cr_non_integral<ElementTp>::type element_identity()
const
+- { return _element_identity; }
+- typename butil::add_cr_non_integral<ResultTp>::type result_identity()
const
+- { return _result_identity; }
++ typename butil::add_cr_non_integral<ElementTp>::type
++ element_identity() const { return _element_identity; }
++ typename butil::add_cr_non_integral<ResultTp>::type
++ result_identity() const { return _result_identity; }
+
+ // [Threadsafe] May be called from anywhere.
+ ResultTp reset_all_agents() {
+@@ -265,7 +274,7 @@ friend class GlobalValue<self_type>;
+ }
+
+ // Always called from the thread owning the agent.
+- void commit_and_erase(Agent *agent) {
++ void commit_and_erase(Agent* agent) {
+ if (NULL == agent) {
+ return;
+ }
+@@ -279,7 +288,7 @@ friend class GlobalValue<self_type>;
+ }
+
+ // Always called from the thread owning the agent
+- void commit_and_clear(Agent *agent) {
++ void commit_and_clear(Agent* agent) {
+ if (NULL == agent) {
+ return;
+ }
+@@ -290,7 +299,7 @@ friend class GlobalValue<self_type>;
+ }
+
+ // We need this function to be as fast as possible.
+- inline Agent* get_or_create_tls_agent() {
++ Agent* get_or_create_tls_agent() {
+ Agent* agent = AgentGroup::get_tls_agent(_id);
+ if (!agent) {
+ // Create the agent
+@@ -300,10 +309,10 @@ friend class GlobalValue<self_type>;
+ return NULL;
+ }
+ }
+- if (agent->combiner) {
++ if (!agent->combiner.expired()) {
+ return agent;
+ }
+- agent->reset(_element_identity, this);
++ agent->reset(_element_identity, this->shared_from_this());
+ // TODO: Is uniqueness-checking necessary here?
+ {
+ butil::AutoLock guard(_lock);
+@@ -317,8 +326,7 @@ friend class GlobalValue<self_type>;
+ // reseting agents is must because the agent object may be reused.
+ // Set element to be default-constructed so that if it's non-pod,
+ // internal allocations should be released.
+- for (butil::LinkNode<Agent>*
+- node = _agents.head(); node != _agents.end();) {
++ for (butil::LinkNode<Agent>* node = _agents.head(); node !=
_agents.end();) {
Review Comment:
This backport still leaves `~AgentCombiner()` calling `clear_all_agents()`,
so the weak-pointer lifetime fix is incomplete. After the last `shared_ptr`
starts destroying the combiner, `Agent::~Agent()` on a concurrently exiting
thread observes `combiner.lock()` as empty and skips `commit_and_erase()`. That
thread then deletes its `ThreadBlock`, but the agent's link can still remain in
this combiner's `_agents` list. The `clear_all_agents()` loop here then walks
`_agents` and dereferences `node->value()` / `node->next()` from TLS storage
that may already have been freed. Upstream brpc's current version of this fix
explicitly stops calling `clear_all_agents()` from `~AgentCombiner` and
documents this exact race; this backport should include that destructor change
too, rather than just switching the agent pointer to `weak_ptr`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]