This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2573150f6df7c5eaf701d1d11d07fd975ac7a724 Author: yiguolei <[email protected]> AuthorDate: Thu Feb 8 22:15:32 2024 +0800 [refactor](runtime filter) do not wait runtime filter rpc finished when hash node or pipeline finished (#30970) Co-authored-by: yiguolei <[email protected]> --- be/src/common/logging.h | 5 +++ be/src/exprs/runtime_filter.h | 8 ---- be/src/exprs/runtime_filter_rpc.cpp | 71 +++++++++--------------------- be/src/exprs/runtime_filter_slots.h | 19 -------- be/src/pipeline/exec/hashjoin_build_sink.h | 1 - be/src/runtime/thread_context.h | 5 --- be/src/vec/exec/join/vhash_join_node.cpp | 3 -- be/src/vec/exec/join/vhash_join_node.h | 7 --- 8 files changed, 25 insertions(+), 94 deletions(-) diff --git a/be/src/common/logging.h b/be/src/common/logging.h index 672edb84e6a..36e5fff9904 100644 --- a/be/src/common/logging.h +++ b/be/src/common/logging.h @@ -110,6 +110,11 @@ private: google::LogMessage _msg; }; +// Very very important!!!! +// Never define LOG_DEBUG or LOG_TRACE. because the tagged logging method will +// always generated string and then check the log level, its performane is bad. +// glog's original method will first check log level if it is not satisfied, +// the log message is not generated. #define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO) #define LOG_WARNING TaggableLogger(__FILE__, __LINE__, google::GLOG_WARNING) #define LOG_ERROR TaggableLogger(__FILE__, __LINE__, google::GLOG_ERROR) diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index f68c0ec250c..6d69302c2ea 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -296,8 +296,6 @@ public: // for ut bool is_bloomfilter(); - bool is_finish_rpc(); - Status join_rpc(); // async push runtimefilter to remote node @@ -419,15 +417,9 @@ protected: // used for await or signal std::mutex _inner_mutex; std::condition_variable _inner_cv; - bool _is_push_down = false; - TExpr _probe_expr; - struct RPCContext; - - std::shared_ptr<RPCContext> _rpc_context; - /// Time in ms (from MonotonicMillis()), that the filter was registered. const int64_t registration_time_; /// runtime filter wait time will be ignored if wait_infinitely is true diff --git a/be/src/exprs/runtime_filter_rpc.cpp b/be/src/exprs/runtime_filter_rpc.cpp index 4ca4a78247c..9c51b7217c1 100644 --- a/be/src/exprs/runtime_filter_rpc.cpp +++ b/be/src/exprs/runtime_filter_rpc.cpp @@ -35,23 +35,13 @@ #include "common/logging.h" #include "util/brpc_client_cache.h" +#include "util/ref_count_closure.h" namespace doris { -struct IRuntimeFilter::RPCContext { - PMergeFilterRequest request; - PMergeFilterResponse response; - brpc::Controller cntl; - brpc::CallId cid; - bool is_finished = false; - - static void finish(std::shared_ptr<RPCContext> ctx) { ctx->is_finished = true; } -}; - Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state, const TNetworkAddress* addr, bool opt_remote_rf) { DCHECK(is_producer()); - DCHECK(_rpc_context == nullptr); std::shared_ptr<PBackendService_Stub> stub( state->exec_env->brpc_internal_client_cache()->get_client(*addr)); if (!stub) { @@ -59,64 +49,43 @@ Status IRuntimeFilter::push_to_remote(RuntimeFilterParamsContext* state, fmt::format("Get rpc stub failed, host={}, port=", addr->hostname, addr->port); return Status::InternalError(msg); } - _rpc_context = std::make_shared<IRuntimeFilter::RPCContext>(); + + auto merge_filter_request = std::make_shared<PMergeFilterRequest>(); + auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared(); + auto merge_filter_closure = + AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>:: + create_unique(merge_filter_request, merge_filter_callback); void* data = nullptr; int len = 0; - auto pquery_id = _rpc_context->request.mutable_query_id(); + auto pquery_id = merge_filter_request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); - auto pfragment_instance_id = _rpc_context->request.mutable_fragment_instance_id(); + auto pfragment_instance_id = merge_filter_request->mutable_fragment_instance_id(); pfragment_instance_id->set_hi(state->fragment_instance_id().hi()); pfragment_instance_id->set_lo(state->fragment_instance_id().lo()); - _rpc_context->request.set_filter_id(_filter_id); - _rpc_context->request.set_opt_remote_rf(opt_remote_rf); - _rpc_context->request.set_is_pipeline(state->enable_pipeline_exec); - _rpc_context->cntl.set_timeout_ms(wait_time_ms()); - _rpc_context->cid = _rpc_context->cntl.call_id(); + merge_filter_request->set_filter_id(_filter_id); + merge_filter_request->set_opt_remote_rf(opt_remote_rf); + merge_filter_request->set_is_pipeline(state->enable_pipeline_exec); + merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); - Status serialize_status = serialize(&_rpc_context->request, &data, &len); + Status serialize_status = serialize(merge_filter_request.get(), &data, &len); if (serialize_status.ok()) { - VLOG_NOTICE << "Producer:" << _rpc_context->request.ShortDebugString() << addr->hostname + VLOG_NOTICE << "Producer:" << merge_filter_request->ShortDebugString() << addr->hostname << ":" << addr->port; if (len > 0) { DCHECK(data != nullptr); - _rpc_context->cntl.request_attachment().append(data, len); + merge_filter_callback->cntl_->request_attachment().append(data, len); } - stub->merge_filter(&_rpc_context->cntl, &_rpc_context->request, &_rpc_context->response, - brpc::NewCallback(RPCContext::finish, _rpc_context)); - - } else { - // we should reset context - _rpc_context.reset(); + stub->merge_filter(merge_filter_closure->cntl_.get(), merge_filter_closure->request_.get(), + merge_filter_closure->response_.get(), merge_filter_closure.get()); + // the closure will be released by brpc during closure->Run. + merge_filter_closure.release(); } return serialize_status; } -bool IRuntimeFilter::is_finish_rpc() { - if (_rpc_context == nullptr) { - return true; - } - return _rpc_context->is_finished; -} - -Status IRuntimeFilter::join_rpc() { - if (!is_producer()) { - return Status::InternalError("RuntimeFilter::join_rpc only called when rf is producer."); - } - if (_rpc_context != nullptr) { - brpc::Join(_rpc_context->cid); - if (_rpc_context->cntl.Failed()) { - // reset stub cache - ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( - _rpc_context->cntl.remote_side()); - return Status::InternalError("RuntimeFilter::join_rpc meet rpc error, msg={}.", - _rpc_context->cntl.ErrorText()); - } - } - return Status::OK(); -} } // namespace doris diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index e1a1f871b95..3dcf84ace08 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -184,25 +184,6 @@ public: } } - bool ready_finish_publish() { - for (auto& pair : _runtime_filters) { - for (auto* filter : pair.second) { - if (!filter->is_finish_rpc()) { - return false; - } - } - } - return true; - } - - void finish_publish() { - for (auto& pair : _runtime_filters) { - for (auto* filter : pair.second) { - static_cast<void>(filter->join_rpc()); - } - } - } - // publish runtime filter Status publish(bool publish_local = false) { for (auto& pair : _runtime_filters) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index efc0a46f3ba..c3d6038b3eb 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -41,7 +41,6 @@ class HashJoinBuildSink final : public StreamingOperator<vectorized::HashJoinNod public: HashJoinBuildSink(OperatorBuilderBase* operator_builder, ExecNode* node); bool can_write() override { return _node->can_sink_write(); } - bool is_pending_finish() const override { return !_node->ready_for_finish(); } }; class HashJoinBuildSinkOperatorX; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index cbe1a19cae6..85ca125f36f 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -144,25 +144,21 @@ public: << ", attach mem tracker label: " << mem_tracker->label(); #endif _task_id = task_id; - _fragment_instance_id = fragment_instance_id; thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, fragment_instance_id); } void detach_task() { _task_id = TUniqueId(); - _fragment_instance_id = TUniqueId(); thread_mem_tracker_mgr->detach_limiter_tracker(); } [[nodiscard]] const TUniqueId& task_id() const { return _task_id; } - [[nodiscard]] const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } static std::string get_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); return ss.str(); } - // After thread_mem_tracker_mgr is initialized, the current thread Hook starts to // consume/release mem_tracker. // Note that the use of shared_ptr will cause a crash. The guess is that there is an @@ -184,7 +180,6 @@ public: private: TUniqueId _task_id; - TUniqueId _fragment_instance_id; }; class ThreadLocalHandle { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 40ba5c42492..3ca828a7f35 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -1085,9 +1085,6 @@ HashJoinNode::~HashJoinNode() { // signal at here is abnormal _shared_hashtable_controller->signal(id(), Status::Cancelled("signaled in destructor")); } - if (_runtime_filter_slots != nullptr) { - _runtime_filter_slots->finish_publish(); - } } void HashJoinNode::_release_mem() { diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index c38f8f563ea..a017633e5ce 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -226,13 +226,6 @@ public: bool should_build_hash_table() const { return _should_build_hash_table; } - bool ready_for_finish() { - if (_runtime_filter_slots == nullptr) { - return true; - } - return _runtime_filter_slots->ready_finish_publish(); - } - bool have_other_join_conjunct() const { return _have_other_join_conjunct; } bool is_right_semi_anti() const { return _is_right_semi_anti; } bool is_outer_join() const { return _is_outer_join; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
