This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6d5d08e6309 [Chore](runtime-filter) add rpc error msg to
RuntimeFilterContext (#43517) (#44622)
6d5d08e6309 is described below
commit 6d5d08e63092d57d11290778eff396fa05c28d25
Author: Pxl <[email protected]>
AuthorDate: Tue Nov 26 22:05:59 2024 +0800
[Chore](runtime-filter) add rpc error msg to RuntimeFilterContext (#43517)
(#44622)
pick from #43517
---
be/src/exprs/runtime_filter.cpp | 43 +++++++++++------------
be/src/vec/runtime/shared_hash_table_controller.h | 1 +
2 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 7c5de4891f5..8d4c9d86932 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -22,7 +22,6 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
-#include <stddef.h>
#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
@@ -1063,30 +1062,33 @@ class SyncSizeClosure : public
AutoReleaseClosure<PSendFilterSizeRequest,
// context, it not the memory is not released. And rpc is in another
thread, it will hold rf context
// after query context because the rpc is not returned.
std::weak_ptr<RuntimeFilterContext> _rf_context;
- std::string _rf_debug_info;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);
void _process_if_rpc_failed() override {
- ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
- LOG(WARNING) << "sync filter size meet rpc error, filter=" <<
_rf_debug_info;
+ Defer defer {[&]() {
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+ auto ctx = _rf_context.lock();
+ if (!ctx) {
+ return;
+ }
+
+ ctx->err_msg = cntl_->ErrorText();
Base::_process_if_rpc_failed();
}
void _process_if_meet_error_status(const Status& status) override {
- ((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+ Defer defer {[&]() {
((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
+ auto ctx = _rf_context.lock();
+ if (!ctx) {
+ return;
+ }
+
if (status.is<ErrorCode::END_OF_FILE>()) {
// rf merger backend may finished before rf's send_filter_size, we
just ignore filter in this case.
- auto ctx = _rf_context.lock();
- if (ctx) {
- ctx->ignored = true;
- } else {
- LOG(WARNING) << "sync filter size returned but context is
released, filter="
- << _rf_debug_info;
- }
+ ctx->ignored = true;
} else {
- LOG(WARNING) << "sync filter size meet error status, filter=" <<
_rf_debug_info;
+ ctx->err_msg = status.to_string();
Base::_process_if_meet_error_status(status);
}
}
@@ -1095,11 +1097,8 @@ public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
- RuntimeFilterContextSPtr rf_context, std::string_view
rf_debug_info)
- : Base(req, callback),
- _dependency(std::move(dependency)),
- _rf_context(rf_context),
- _rf_debug_info(rf_debug_info) {}
+ RuntimeFilterContextSPtr rf_context)
+ : Base(req, callback), _dependency(std::move(dependency)),
_rf_context(rf_context) {}
};
Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t
local_filter_size) {
@@ -1146,8 +1145,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
auto callback =
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that
could not use
// a raw pointer in closure. Has to use the context's shared ptr.
- auto closure = SyncSizeClosure::create_unique(request, callback,
_dependency,
- _wrapper->_context,
this->debug_string());
+ auto closure =
+ SyncSizeClosure::create_unique(request, callback, _dependency,
_wrapper->_context);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->get_query_ctx()->query_id().hi);
pquery_id->set_lo(_state->get_query_ctx()->query_id().lo);
@@ -1531,9 +1530,9 @@ void
IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_
std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
- "build_bf_cardinality: {}",
+ "build_bf_cardinality: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
- _wrapper->get_build_bf_cardinality());
+ _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
}
Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index 1ca7325e8cb..ae5917efe91 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -46,6 +46,7 @@ struct RuntimeFilterContext {
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
bool ignored = false;
+ std::string err_msg;
};
using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]