This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit df20012e93f4cf97a1f37c99e284612a76be2253
Author: Mryange <[email protected]>
AuthorDate: Thu Jul 13 10:44:20 2023 +0800

    Revert "[imporve](bloomfilter) refactor runtime_filter_mgr with bloomfilter 
(#21715)" (#21763)
    
    This reverts commit 925da90480f60afc0e5333a536d41e004234874e.
---
 be/src/runtime/runtime_filter_mgr.cpp | 38 ++++++++++++++++-------------------
 be/src/runtime/runtime_filter_mgr.h   | 13 ++++++------
 2 files changed, 23 insertions(+), 28 deletions(-)

diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 6614532d85..4380716671 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -39,7 +39,6 @@
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/brpc_client_cache.h"
-#include "util/spinlock.h"
 
 namespace doris {
 
@@ -209,7 +208,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* 
query_options,
         const std::vector<doris::TRuntimeFilterTargetParams>* target_info,
         const int producer_size) {
-    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+    std::lock_guard<std::mutex> guard(_filter_map_mutex);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal = 
std::make_shared<RuntimeFilterCntlVal>();
     // runtime_filter_desc and target will be released,
     // so we need to copy to cntVal
@@ -220,10 +219,10 @@ Status 
RuntimeFilterMergeControllerEntity::_init_with_desc(
     cntVal->filter =
             cntVal->pool->add(new IRuntimeFilter(_state, 
&_state->get_query_ctx()->obj_pool));
 
-    auto filter_id = runtime_filter_desc->filter_id;
+    std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options, -1, false);
-    _filter_map.emplace(filter_id, CntlValwithLock {cntVal, 
std::make_unique<SpinLock>()});
+    _filter_map.emplace(filter_id, cntVal);
     return Status::OK();
 }
 
@@ -231,7 +230,7 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
         const TRuntimeFilterDesc* runtime_filter_desc, const TQueryOptions* 
query_options,
         const std::vector<doris::TRuntimeFilterTargetParamsV2>* targetv2_info,
         const int producer_size) {
-    std::unique_lock<std::shared_mutex> guard(_filter_map_mutex);
+    std::lock_guard<std::mutex> guard(_filter_map_mutex);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal = 
std::make_shared<RuntimeFilterCntlVal>();
     // runtime_filter_desc and target will be released,
     // so we need to copy to cntVal
@@ -242,10 +241,10 @@ Status 
RuntimeFilterMergeControllerEntity::_init_with_desc(
     cntVal->filter =
             cntVal->pool->add(new IRuntimeFilter(_state, 
&_state->get_query_ctx()->obj_pool));
 
-    auto filter_id = runtime_filter_desc->filter_id;
+    std::string filter_id = std::to_string(runtime_filter_desc->filter_id);
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, 
query_options);
-    _filter_map.emplace(filter_id, CntlValwithLock {cntVal, 
std::make_unique<SpinLock>()});
+    _filter_map.emplace(filter_id, cntVal);
     return Status::OK();
 }
 
@@ -313,37 +312,34 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
     int64_t merge_time = 0;
-    int64_t start_merge = MonotonicMillis();
-    auto filter_id = request->filter_id();
-    std::map<int, CntlValwithLock>::iterator iter;
     {
-        std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
-        iter = _filter_map.find(filter_id);
+        int64_t start_merge = MonotonicMillis();
+        std::lock_guard<std::mutex> guard(_filter_map_mutex);
+        auto iter = _filter_map.find(std::to_string(request->filter_id()));
         VLOG_ROW << "recv filter id:" << request->filter_id() << " " << 
request->ShortDebugString();
         if (iter == _filter_map.end()) {
             return Status::InvalidArgument("unknown filter id {}",
                                            
std::to_string(request->filter_id()));
         }
-    }
-    // iter->second = pair{CntlVal,SpinLock}
-    cntVal = iter->second.first;
-    {
-        std::lock_guard<SpinLock> l(*iter->second.second);
+        cntVal = iter->second;
+        if (auto bf = cntVal->filter->get_bloomfilter()) {
+            RETURN_IF_ERROR(bf->init_with_fixed_length());
+        }
         MergeRuntimeFilterParams params(request, attach_data);
-        ObjectPool* pool = cntVal->pool.get();
+        ObjectPool* pool = iter->second->pool.get();
         RuntimeFilterWrapperHolder holder;
         RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(_state, &params, pool, 
holder.getHandle()));
         RETURN_IF_ERROR(cntVal->filter->merge_from(holder.getHandle()->get()));
-        cntVal->arrive_id.insert(UniqueId(request->fragment_id()));
+        cntVal->arrive_id.insert(UniqueId(request->fragment_id()).to_string());
         merged_size = cntVal->arrive_id.size();
         // TODO: avoid log when we had acquired a lock
         VLOG_ROW << "merge size:" << merged_size << ":" << 
cntVal->producer_size;
         DCHECK_LE(merged_size, cntVal->producer_size);
-        cntVal->merge_time += (MonotonicMillis() - start_merge);
+        iter->second->merge_time += (MonotonicMillis() - start_merge);
         if (merged_size < cntVal->producer_size) {
             return Status::OK();
         } else {
-            merge_time = cntVal->merge_time;
+            merge_time = iter->second->merge_time;
         }
     }
 
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index 2048229cd2..7d7e7a0ba1 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -26,7 +26,6 @@
 #include <map>
 #include <memory>
 #include <mutex>
-#include <shared_mutex>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -146,12 +145,12 @@ public:
         std::vector<doris::TRuntimeFilterTargetParams> target_info;
         std::vector<doris::TRuntimeFilterTargetParamsV2> targetv2_info;
         IRuntimeFilter* filter;
-        std::unordered_set<UniqueId> arrive_id; // fragment_instance_id ?
+        std::unordered_set<std::string> arrive_id; // fragment_instance_id ?
         std::shared_ptr<ObjectPool> pool;
     };
 
 public:
-    RuntimeFilterCntlVal* get_filter(int id) { return 
_filter_map[id].first.get(); }
+    RuntimeFilterCntlVal* get_filter(int id) { return 
_filter_map[std::to_string(id)].get(); }
 
 private:
     Status _init_with_desc(const TRuntimeFilterDesc* runtime_filter_desc,
@@ -167,11 +166,11 @@ private:
     UniqueId _query_id;
     UniqueId _fragment_instance_id;
     // protect _filter_map
-    std::shared_mutex _filter_map_mutex;
+    std::mutex _filter_map_mutex;
     std::shared_ptr<MemTracker> _mem_tracker;
-    using CntlValwithLock =
-            std::pair<std::shared_ptr<RuntimeFilterCntlVal>, 
std::unique_ptr<SpinLock>>;
-    std::map<int, CntlValwithLock> _filter_map;
+    // TODO: convert filter id to i32
+    // filter-id -> val
+    std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
     RuntimeState* _state;
     bool _opt_remote_rf = true;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to