This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0e883e9d2b1ef34802f1ba6384351cb0a61cc544
Author: HappenLee <[email protected]>
AuthorDate: Mon Jul 10 16:16:05 2023 +0800

    [Profile](runtimefilter) fix merge time of runtime filter (#21654)
---
 be/src/exprs/bloom_filter_func.h      | 1 +
 be/src/runtime/runtime_filter_mgr.cpp | 7 +++++--
 be/src/runtime/runtime_filter_mgr.h   | 3 +--
 3 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/be/src/exprs/bloom_filter_func.h b/be/src/exprs/bloom_filter_func.h
index e6fed96b58..2d2b7664ac 100644
--- a/be/src/exprs/bloom_filter_func.h
+++ b/be/src/exprs/bloom_filter_func.h
@@ -137,6 +137,7 @@ public:
         if (_inited) {
             return Status::OK();
         }
+        // TODO: really need the lock?
         std::lock_guard<std::mutex> l(_lock);
         if (_inited) {
             return Status::OK();
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 14ee165684..4380716671 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -311,6 +311,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
+    int64_t merge_time = 0;
     {
         int64_t start_merge = MonotonicMillis();
         std::lock_guard<std::mutex> guard(_filter_map_mutex);
@@ -334,9 +335,11 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
         // TODO: avoid log when we had acquired a lock
         VLOG_ROW << "merge size:" << merged_size << ":" << 
cntVal->producer_size;
         DCHECK_LE(merged_size, cntVal->producer_size);
-        _merge_timer += (MonotonicMillis() - start_merge);
+        iter->second->merge_time += (MonotonicMillis() - start_merge);
         if (merged_size < cntVal->producer_size) {
             return Status::OK();
+        } else {
+            merge_time = iter->second->merge_time;
         }
     }
 
@@ -374,7 +377,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                 rpc_contexts[cur]->request.set_filter_id(request->filter_id());
                 
rpc_contexts[cur]->request.set_is_pipeline(request->has_is_pipeline() &&
                                                            
request->is_pipeline());
-                rpc_contexts[cur]->request.set_merge_time(_merge_timer);
+                rpc_contexts[cur]->request.set_merge_time(merge_time);
                 *rpc_contexts[cur]->request.mutable_query_id() = 
request->query_id();
                 if (has_attachment) {
                     
rpc_contexts[cur]->cntl.request_attachment().append(request_attachment);
diff --git a/be/src/runtime/runtime_filter_mgr.h 
b/be/src/runtime/runtime_filter_mgr.h
index f3cb32813f..7d7e7a0ba1 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -139,7 +139,7 @@ public:
     UniqueId instance_id() const { return _fragment_instance_id; }
 
     struct RuntimeFilterCntlVal {
-        int64_t create_time;
+        int64_t merge_time;
         int producer_size;
         TRuntimeFilterDesc runtime_filter_desc;
         std::vector<doris::TRuntimeFilterTargetParams> target_info;
@@ -173,7 +173,6 @@ private:
     std::map<std::string, std::shared_ptr<RuntimeFilterCntlVal>> _filter_map;
     RuntimeState* _state;
     bool _opt_remote_rf = true;
-    int64_t _merge_timer = 0;
 };
 
 // RuntimeFilterMergeController has a map query-id -> entity


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to