This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_1126_2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit a7fd714e6ca9d7b7d3a6085a516d64c339d6eb07 Author: Pxl <[email protected]> AuthorDate: Mon Nov 11 14:19:13 2024 +0800 [Chore](runtime-filter) add rpc error msg to RuntimeFilterContext (#43517) add rpc error msg to RuntimeFilterContext --- be/src/exprs/runtime_filter.cpp | 43 +++++++++++------------ be/src/vec/runtime/shared_hash_table_controller.h | 1 + 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 78d13aac279..a8038d8d878 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -22,7 +22,6 @@ #include <gen_cpp/PlanNodes_types.h> #include <gen_cpp/Types_types.h> #include <gen_cpp/internal_service.pb.h> -#include <stddef.h> #include <algorithm> // IWYU pragma: no_include <bits/chrono.h> @@ -1063,30 +1062,33 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest, // context, it not the memory is not released. And rpc is in another thread, it will hold rf context // after query context because the rpc is not returned. std::weak_ptr<RuntimeFilterContext> _rf_context; - std::string _rf_debug_info; using Base = AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>; ENABLE_FACTORY_CREATOR(SyncSizeClosure); void _process_if_rpc_failed() override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); - LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info; + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context.lock(); + if (!ctx) { + return; + } + + ctx->err_msg = cntl_->ErrorText(); Base::_process_if_rpc_failed(); } void _process_if_meet_error_status(const Status& status) override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context.lock(); + if (!ctx) { + return; + } + if (status.is<ErrorCode::END_OF_FILE>()) { // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. - auto ctx = _rf_context.lock(); - if (ctx) { - ctx->ignored = true; - } else { - LOG(WARNING) << "sync filter size returned but context is released, filter=" - << _rf_debug_info; - } + ctx->ignored = true; } else { - LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info; + ctx->err_msg = status.to_string(); Base::_process_if_meet_error_status(status); } } @@ -1095,11 +1097,8 @@ public: SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req, std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback, std::shared_ptr<pipeline::Dependency> dependency, - RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info) - : Base(req, callback), - _dependency(std::move(dependency)), - _rf_context(rf_context), - _rf_debug_info(rf_debug_info) {} + RuntimeFilterContextSPtr rf_context) + : Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {} }; Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { @@ -1146,8 +1145,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared(); // IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, - _wrapper->_context, this->debug_string()); + auto closure = + SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->get_query_ctx()->query_id().hi); pquery_id->set_lo(_state->get_query_ctx()->query_id().lo); @@ -1530,9 +1529,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_ std::string IRuntimeFilter::debug_string() const { return fmt::format( "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, " - "build_bf_cardinality: {}", + "build_bf_cardinality: {}, error_msg: {}", _filter_id, to_string(_runtime_filter_type), _is_broadcast_join, - _wrapper->get_build_bf_cardinality()); + _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg); } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 1ca7325e8cb..ae5917efe91 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -46,6 +46,7 @@ struct RuntimeFilterContext { std::shared_ptr<BloomFilterFuncBase> bloom_filter_func; std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func; bool ignored = false; + std::string err_msg; }; using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
