This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2573150f6df7c5eaf701d1d11d07fd975ac7a724
Author: yiguolei <[email protected]>
AuthorDate: Thu Feb 8 22:15:32 2024 +0800

    [refactor](runtime filter) do not wait runtime filter rpc finished when 
hash node or pipeline finished (#30970)
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/common/logging.h                    |  5 +++
 be/src/exprs/runtime_filter.h              |  8 ----
 be/src/exprs/runtime_filter_rpc.cpp        | 71 +++++++++---------------------
 be/src/exprs/runtime_filter_slots.h        | 19 --------
 be/src/pipeline/exec/hashjoin_build_sink.h |  1 -
 be/src/runtime/thread_context.h            |  5 ---
 be/src/vec/exec/join/vhash_join_node.cpp   |  3 --
 be/src/vec/exec/join/vhash_join_node.h     |  7 ---
 8 files changed, 25 insertions(+), 94 deletions(-)

diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index 672edb84e6a..36e5fff9904 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -110,6 +110,11 @@ private:
     google::LogMessage _msg;
 };
 
+// Very very important!!!!
+// Never define LOG_DEBUG or LOG_TRACE. because the tagged logging method will
+// always generated string and then check the log level, its performane is bad.
+// glog's original method will first check log level if it is not satisfied,
+// the log message is not generated.
 #define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
 #define LOG_WARNING TaggableLogger(__FILE__, __LINE__, google::GLOG_WARNING)
 #define LOG_ERROR TaggableLogger(__FILE__, __LINE__, google::GLOG_ERROR)
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index f68c0ec250c..6d69302c2ea 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -296,8 +296,6 @@ public:
     // for ut
     bool is_bloomfilter();
 
-    bool is_finish_rpc();
-
     Status join_rpc();
 
     // async push runtimefilter to remote node
@@ -419,15 +417,9 @@ protected:
     // used for await or signal
     std::mutex _inner_mutex;
     std::condition_variable _inner_cv;
-
     bool _is_push_down = false;
-
     TExpr _probe_expr;
 
-    struct RPCContext;
-
-    std::shared_ptr<RPCContext> _rpc_context;
-
     /// Time in ms (from MonotonicMillis()), that the filter was registered.
     const int64_t registration_time_;
     /// runtime filter wait time will be ignored if wait_infinitely is true
diff --git a/be/src/exprs/runtime_filter_rpc.cpp 
b/be/src/exprs/runtime_filter_rpc.cpp
index 4ca4a78247c..9c51b7217c1 100644
--- a/be/src/exprs/runtime_filter_rpc.cpp
+++ b/be/src/exprs/runtime_filter_rpc.cpp
@@ -35,23 +35,13 @@
 
 #include "common/logging.h"
 #include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
 
 namespace doris {
 
-struct IRuntimeFilter::RPCContext {
-    PMergeFilterRequest request;
-    PMergeFilterResponse response;
-    brpc::Controller cntl;
-    brpc::CallId cid;
-    bool is_finished = false;
-
-    static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = 
true; }
-};
-
 Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state,
                                       const TNetworkAddress* addr, bool 
opt_remote_rf) {
     DCHECK(is_producer());
-    DCHECK(_rpc_context == nullptr);
     std::shared_ptr<PBackendService_Stub> stub(
             state->exec_env->brpc_internal_client_cache()->get_client(*addr));
     if (!stub) {
@@ -59,64 +49,43 @@ Status 
IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state,
                 fmt::format("Get rpc stub failed, host={},  port=", 
addr->hostname, addr->port);
         return Status::InternalError(msg);
     }
-    _rpc_context = std::make_shared<IRuntimeFilter::RPCContext>();
+
+    auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
+    auto merge_filter_callback = 
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
+    auto merge_filter_closure =
+            AutoReleaseClosure<PMergeFilterRequest, 
DummyBrpcCallback<PMergeFilterResponse>>::
+                    create_unique(merge_filter_request, merge_filter_callback);
     void* data = nullptr;
     int len = 0;
 
-    auto pquery_id = _rpc_context->request.mutable_query_id();
+    auto pquery_id = merge_filter_request->mutable_query_id();
     pquery_id->set_hi(_state->query_id.hi());
     pquery_id->set_lo(_state->query_id.lo());
 
-    auto pfragment_instance_id = 
_rpc_context->request.mutable_fragment_instance_id();
+    auto pfragment_instance_id = 
merge_filter_request->mutable_fragment_instance_id();
     pfragment_instance_id->set_hi(state->fragment_instance_id().hi());
     pfragment_instance_id->set_lo(state->fragment_instance_id().lo());
 
-    _rpc_context->request.set_filter_id(_filter_id);
-    _rpc_context->request.set_opt_remote_rf(opt_remote_rf);
-    _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec);
-    _rpc_context->cntl.set_timeout_ms(wait_time_ms());
-    _rpc_context->cid = _rpc_context->cntl.call_id();
+    merge_filter_request->set_filter_id(_filter_id);
+    merge_filter_request->set_opt_remote_rf(opt_remote_rf);
+    merge_filter_request->set_is_pipeline(state->enable_pipeline_exec);
+    merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
 
-    Status serialize_status = serialize(&_rpc_context->request, &data, &len);
+    Status serialize_status = serialize(merge_filter_request.get(), &data, 
&len);
     if (serialize_status.ok()) {
-        VLOG_NOTICE << "Producer:" << _rpc_context->request.ShortDebugString() 
<< addr->hostname
+        VLOG_NOTICE << "Producer:" << merge_filter_request->ShortDebugString() 
<< addr->hostname
                     << ":" << addr->port;
         if (len > 0) {
             DCHECK(data != nullptr);
-            _rpc_context->cntl.request_attachment().append(data, len);
+            merge_filter_callback->cntl_->request_attachment().append(data, 
len);
         }
 
-        stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, 
&_rpc_context->response,
-                           brpc::NewCallback(RPCContext::finish, 
_rpc_context));
-
-    } else {
-        // we should reset context
-        _rpc_context.reset();
+        stub->merge_filter(merge_filter_closure->cntl_.get(), 
merge_filter_closure->request_.get(),
+                           merge_filter_closure->response_.get(), 
merge_filter_closure.get());
+        // the closure will be released by brpc during closure->Run.
+        merge_filter_closure.release();
     }
     return serialize_status;
 }
 
-bool IRuntimeFilter::is_finish_rpc() {
-    if (_rpc_context == nullptr) {
-        return true;
-    }
-    return _rpc_context->is_finished;
-}
-
-Status IRuntimeFilter::join_rpc() {
-    if (!is_producer()) {
-        return Status::InternalError("RuntimeFilter::join_rpc only called when 
rf is producer.");
-    }
-    if (_rpc_context != nullptr) {
-        brpc::Join(_rpc_context->cid);
-        if (_rpc_context->cntl.Failed()) {
-            // reset stub cache
-            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
-                    _rpc_context->cntl.remote_side());
-            return Status::InternalError("RuntimeFilter::join_rpc meet rpc 
error, msg={}.",
-                                         _rpc_context->cntl.ErrorText());
-        }
-    }
-    return Status::OK();
-}
 } // namespace doris
diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index e1a1f871b95..3dcf84ace08 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -184,25 +184,6 @@ public:
         }
     }
 
-    bool ready_finish_publish() {
-        for (auto& pair : _runtime_filters) {
-            for (auto* filter : pair.second) {
-                if (!filter->is_finish_rpc()) {
-                    return false;
-                }
-            }
-        }
-        return true;
-    }
-
-    void finish_publish() {
-        for (auto& pair : _runtime_filters) {
-            for (auto* filter : pair.second) {
-                static_cast<void>(filter->join_rpc());
-            }
-        }
-    }
-
     // publish runtime filter
     Status publish(bool publish_local = false) {
         for (auto& pair : _runtime_filters) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index efc0a46f3ba..c3d6038b3eb 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -41,7 +41,6 @@ class HashJoinBuildSink final : public 
StreamingOperator<vectorized::HashJoinNod
 public:
     HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node);
     bool can_write() override { return _node->can_sink_write(); }
-    bool is_pending_finish() const override { return 
!_node->ready_for_finish(); }
 };
 
 class HashJoinBuildSinkOperatorX;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index cbe1a19cae6..85ca125f36f 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -144,25 +144,21 @@ public:
                 << ", attach mem tracker label: " << mem_tracker->label();
 #endif
         _task_id = task_id;
-        _fragment_instance_id = fragment_instance_id;
         thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, 
fragment_instance_id);
     }
 
     void detach_task() {
         _task_id = TUniqueId();
-        _fragment_instance_id = TUniqueId();
         thread_mem_tracker_mgr->detach_limiter_tracker();
     }
 
     [[nodiscard]] const TUniqueId& task_id() const { return _task_id; }
-    [[nodiscard]] const TUniqueId& fragment_instance_id() const { return 
_fragment_instance_id; }
 
     static std::string get_thread_id() {
         std::stringstream ss;
         ss << std::this_thread::get_id();
         return ss.str();
     }
-
     // After thread_mem_tracker_mgr is initialized, the current thread Hook 
starts to
     // consume/release mem_tracker.
     // Note that the use of shared_ptr will cause a crash. The guess is that 
there is an
@@ -184,7 +180,6 @@ public:
 
 private:
     TUniqueId _task_id;
-    TUniqueId _fragment_instance_id;
 };
 
 class ThreadLocalHandle {
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 40ba5c42492..3ca828a7f35 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1085,9 +1085,6 @@ HashJoinNode::~HashJoinNode() {
         // signal at here is abnormal
         _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled 
in destructor"));
     }
-    if (_runtime_filter_slots != nullptr) {
-        _runtime_filter_slots->finish_publish();
-    }
 }
 
 void HashJoinNode::_release_mem() {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index c38f8f563ea..a017633e5ce 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -226,13 +226,6 @@ public:
 
     bool should_build_hash_table() const { return _should_build_hash_table; }
 
-    bool ready_for_finish() {
-        if (_runtime_filter_slots == nullptr) {
-            return true;
-        }
-        return _runtime_filter_slots->ready_finish_publish();
-    }
-
     bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
     bool is_right_semi_anti() const { return _is_right_semi_anti; }
     bool is_outer_join() const { return _is_outer_join; }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to