This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cf28b101547 [Bug](runtime-filter) fix some rf error problems (#37273)
cf28b101547 is described below
commit cf28b1015473683313b895752a93dcd1e6bf729e
Author: Pxl <[email protected]>
AuthorDate: Thu Jul 4 19:31:50 2024 +0800
[Bug](runtime-filter) fix some rf error problems (#37273)
## Proposed changes
1. ignore rf when rf-mgr released
2. move acquire rf controller to after acquire query_ctx on
send_filter_size
3. enlarge timeout limit on sync_filter_size/apply_filterv2
4. logout rf's debug string when rpc meet error
---
be/src/exprs/runtime_filter.cpp | 17 +++++++++++++----
be/src/exprs/runtime_filter_slots.h | 9 +++++++++
be/src/pipeline/pipeline_fragment_context.h | 3 ---
be/src/runtime/fragment_mgr.cpp | 9 +++++----
be/src/runtime/runtime_filter_mgr.cpp | 2 ++
5 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2dcfc97b096..e69ff714d32 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1033,25 +1033,34 @@ Status IRuntimeFilter::publish(bool publish_local) {
class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>> {
std::shared_ptr<pipeline::Dependency> _dependency;
+ IRuntimeFilter* _filter;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);
void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
+ LOG(WARNING) << "sync filter size meet rpc error, filter=" <<
_filter->debug_string();
Base::_process_if_rpc_failed();
}
void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
- Base::_process_if_meet_error_status(status);
+ if (status.is<ErrorCode::END_OF_FILE>()) {
+ // rf merger backend may finished before rf's send_filter_size, we
just ignore filter in this case.
+ _filter->set_ignored();
+ } else {
+ LOG(WARNING) << "sync filter size meet error status, filter="
+ << _filter->debug_string();
+ Base::_process_if_meet_error_status(status);
+ }
}
public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
- std::shared_ptr<pipeline::Dependency> dependency)
- : Base(req, callback), _dependency(std::move(dependency)) {}
+ std::shared_ptr<pipeline::Dependency> dependency,
IRuntimeFilter* filter)
+ : Base(req, callback), _dependency(std::move(dependency)),
_filter(filter) {}
};
Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t
local_filter_size) {
@@ -1094,7 +1103,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
auto request = std::make_shared<PSendFilterSizeRequest>();
auto callback =
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
- auto closure = SyncSizeClosure::create_unique(request, callback,
_dependency);
+ auto closure = SyncSizeClosure::create_unique(request, callback,
_dependency, this);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 0bf8a33f9f2..ebda4b56fcc 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -71,6 +71,9 @@ public:
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto* filter : _runtime_filters) {
+ if (filter->get_ignored()) {
+ continue;
+ }
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
continue;
}
@@ -83,6 +86,9 @@ public:
// process ignore filter when it has IN_FILTER on same expr, and init
bloom filter size
for (auto* filter : _runtime_filters) {
+ if (filter->get_ignored()) {
+ continue;
+ }
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
!has_in_filter.contains(filter->expr_order())) {
continue;
@@ -95,6 +101,9 @@ public:
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
+ if (filter->get_ignored()) {
+ continue;
+ }
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) >
state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 94dd96731c2..3b6c73dbef4 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -249,9 +249,6 @@ private:
bool _need_local_merge = false;
- // It is used to manage the lifecycle of RuntimeFilterMergeController
- std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>>
_merge_controller_handlers;
-
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
#ifdef __clang__
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fe7f0d13c2b..a23095e78bd 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1082,8 +1082,6 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
UniqueId queryid = request->query_id();
- std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
- RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
std::shared_ptr<QueryContext> query_ctx;
{
@@ -1094,10 +1092,13 @@ Status FragmentMgr::send_filter_size(const
PSendFilterSizeRequest* request) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
- return Status::InvalidArgument("Query context (query-id: {}) not
found",
- queryid.to_string());
+ return Status::EndOfFile("Query context (query-id: {}) not found,
maybe finished",
+ queryid.to_string());
}
}
+
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+ RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
auto merge_status = filter_controller->send_filter_size(request);
return merge_status;
}
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index c9812508446..0e5b37c8ffa 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -341,6 +341,7 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
auto* pquery_id = closure->request_->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
+ closure->cntl_->set_timeout_ms(std::min(3600,
_state->execution_timeout) * 1000);
closure->request_->set_filter_id(filter_id);
closure->request_->set_filter_size(cnt_val->global_size);
@@ -453,6 +454,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
if (has_attachment) {
closure->cntl_->request_attachment().append(request_attachment);
}
+ closure->cntl_->set_timeout_ms(std::min(3600,
_state->execution_timeout) * 1000);
// set fragment-id
for (auto& target_fragment_instance_id :
target.target_fragment_instance_ids) {
PUniqueId* cur_id =
closure->request_->add_fragment_instance_ids();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]