This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 4b31e52b249 [enhancement](runtimefilter) fix potential core in runtime 
filter sync filter size (#38058) (#38093)
4b31e52b249 is described below

commit 4b31e52b24981f9d8884756d091043c62c0b4fc0
Author: yiguolei <[email protected]>
AuthorDate: Thu Jul 18 23:11:26 2024 +0800

    [enhancement](runtimefilter) fix potential core in runtime filter sync 
filter size (#38058) (#38093)
    
    pick #38058
    
    ## Proposed changes
    IRuntimeFilter maybe deconstructed before the rpc finished, so that
    could not use a raw pointer in closure. Has to use the context's shared
    ptr.
    
    ---------
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/exprs/runtime_filter.cpp                   | 27 ++++++++++++++---------
 be/src/exprs/runtime_filter.h                     |  4 ++--
 be/src/vec/runtime/shared_hash_table_controller.h |  4 ++--
 3 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index fb24cf1eb0d..1bf921b67ac 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -952,7 +952,7 @@ private:
     RuntimeFilterType _filter_type;
     int32_t _max_in_num = -1;
 
-    SharedRuntimeFilterContext _context;
+    RuntimeFilterContextSPtr _context;
     uint32_t _filter_id;
 };
 
@@ -965,7 +965,7 @@ Status IRuntimeFilter::create(RuntimeFilterParamsContext* 
state, ObjectPool* poo
     return (*res)->init_with_desc(desc, query_options, node_id, 
build_bf_exactly);
 }
 
-SharedRuntimeFilterContext& IRuntimeFilter::get_shared_context_ref() {
+RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
     return _wrapper->_context;
 }
 
@@ -1033,14 +1033,15 @@ Status IRuntimeFilter::publish(bool publish_local) {
 class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
                                                   
DummyBrpcCallback<PSendFilterSizeResponse>> {
     std::shared_ptr<pipeline::Dependency> _dependency;
-    IRuntimeFilter* _filter;
+    RuntimeFilterContextSPtr _rf_context;
+    std::string _rf_debug_info;
     using Base =
             AutoReleaseClosure<PSendFilterSizeRequest, 
DummyBrpcCallback<PSendFilterSizeResponse>>;
     ENABLE_FACTORY_CREATOR(SyncSizeClosure);
 
     void _process_if_rpc_failed() override {
         ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
-        LOG(WARNING) << "sync filter size meet rpc error, filter=" << 
_filter->debug_string();
+        LOG(WARNING) << "sync filter size meet rpc error, filter=" << 
_rf_debug_info;
         Base::_process_if_rpc_failed();
     }
 
@@ -1048,10 +1049,9 @@ class SyncSizeClosure : public 
AutoReleaseClosure<PSendFilterSizeRequest,
         ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
         if (status.is<ErrorCode::END_OF_FILE>()) {
             // rf merger backend may finished before rf's send_filter_size, we 
just ignore filter in this case.
-            _filter->set_ignored();
+            _rf_context->ignored = true;
         } else {
-            LOG(WARNING) << "sync filter size meet error status, filter="
-                         << _filter->debug_string();
+            LOG(WARNING) << "sync filter size meet error status, filter=" << 
_rf_debug_info;
             Base::_process_if_meet_error_status(status);
         }
     }
@@ -1059,8 +1059,12 @@ class SyncSizeClosure : public 
AutoReleaseClosure<PSendFilterSizeRequest,
 public:
     SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
                     
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
-                    std::shared_ptr<pipeline::Dependency> dependency, 
IRuntimeFilter* filter)
-            : Base(req, callback), _dependency(std::move(dependency)), 
_filter(filter) {}
+                    std::shared_ptr<pipeline::Dependency> dependency,
+                    RuntimeFilterContextSPtr rf_context, std::string_view 
rf_debug_info)
+            : Base(req, callback),
+              _dependency(std::move(dependency)),
+              _rf_context(rf_context),
+              _rf_debug_info(rf_debug_info) {}
 };
 
 Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t 
local_filter_size) {
@@ -1103,7 +1107,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
 
     auto request = std::make_shared<PSendFilterSizeRequest>();
     auto callback = 
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
-    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency, this);
+    // IRuntimeFilter maybe deconstructed before the rpc finished, so that 
could not use
+    // a raw pointer in closure. Has to use the context's shared ptr.
+    auto closure = SyncSizeClosure::create_unique(request, callback, 
_dependency,
+                                                  _wrapper->_context, 
this->debug_string());
     auto* pquery_id = request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index e8c5bbfd872..7b57a86a94c 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -65,7 +65,7 @@ class TQueryOptions;
 namespace vectorized {
 class VExpr;
 class VExprContext;
-struct SharedRuntimeFilterContext;
+struct RuntimeFilterContextSPtr;
 } // namespace vectorized
 
 namespace pipeline {
@@ -221,7 +221,7 @@ public:
                          const RuntimeFilterRole role, int node_id, 
IRuntimeFilter** res,
                          bool build_bf_exactly = false, bool need_local_merge 
= false);
 
-    SharedRuntimeFilterContext& get_shared_context_ref();
+    RuntimeFilterContextSPtr& get_shared_context_ref();
 
     // insert data to build filter
     void insert_batch(vectorized::ColumnPtr column, size_t start);
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 8fe46b97b85..b685b984bf3 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -48,7 +48,7 @@ struct RuntimeFilterContext {
     bool ignored = false;
 };
 
-using SharedRuntimeFilterContext = std::shared_ptr<RuntimeFilterContext>;
+using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>;
 
 namespace vectorized {
 
@@ -63,7 +63,7 @@ struct SharedHashTableContext {
     std::shared_ptr<void> hash_table_variants;
     std::shared_ptr<Block> block;
     std::shared_ptr<std::vector<uint32_t>> build_indexes_null;
-    std::map<int, SharedRuntimeFilterContext> runtime_filters;
+    std::map<int, RuntimeFilterContextSPtr> runtime_filters;
     std::atomic<bool> signaled = false;
     bool short_circuit_for_null_in_probe_side = false;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to