This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 33726b0e5a8 [Feature](runtime-filter) cancel query when
runtime_filter's rpc failed (#43627)
33726b0e5a8 is described below
commit 33726b0e5a8c49bc72207d21e209ba3d4d3ce9e3
Author: Pxl <[email protected]>
AuthorDate: Mon Nov 18 10:17:28 2024 +0800
[Feature](runtime-filter) cancel query when runtime_filter's rpc failed
(#43627)
### What problem does this PR solve?
cancel query when runtime_filter's rpc failed
---
be/src/common/status.h | 1 +
be/src/exprs/runtime_filter.cpp | 23 +++++++++++++--------
be/src/exprs/runtime_filter.h | 4 ++--
be/src/exprs/runtime_filter_slots.h | 4 ++--
be/src/exprs/runtime_filter_slots_cross.h | 4 ++--
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
.../exec/nested_loop_join_build_operator.cpp | 2 +-
be/src/runtime/fragment_mgr.cpp | 4 ++--
be/src/runtime/query_context.h | 6 ++++++
be/src/runtime/runtime_filter_mgr.cpp | 14 +++++++++----
be/src/runtime/runtime_filter_mgr.h | 5 +++--
be/src/runtime/runtime_state.cpp | 6 +++++-
be/src/runtime/runtime_state.h | 2 ++
be/src/util/ref_count_closure.h | 24 +++++++++++++++++-----
.../java/org/apache/doris/qe/SessionVariable.java | 5 +++++
gensrc/thrift/PaloInternalService.thrift | 2 ++
16 files changed, 78 insertions(+), 30 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index de029d87ec9..344f82a81b8 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -489,6 +489,7 @@ public:
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
+ ERROR_CTOR_NOSTACK(NetworkError, NETWORK_ERROR)
#undef ERROR_CTOR
template <int code>
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 85f1c535c70..d05bb6fa3cf 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const
vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}
-Status IRuntimeFilter::publish(bool publish_local) {
+Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
DCHECK(is_producer());
auto send_to_remote = [&](IRuntimeFilter* filter) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
- return filter->push_to_remote(&addr);
+ return filter->push_to_remote(state, &addr);
};
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper)
{
std::vector<std::shared_ptr<IRuntimeFilter>> filters;
@@ -1088,8 +1088,10 @@ public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
- RuntimeFilterContextSPtr rf_context)
- : Base(req, callback), _dependency(std::move(dependency)),
_rf_context(rf_context) {}
+ RuntimeFilterContextSPtr rf_context,
std::weak_ptr<QueryContext> context)
+ : Base(req, callback, context),
+ _dependency(std::move(dependency)),
+ _rf_context(rf_context) {}
};
Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t
local_filter_size) {
@@ -1133,8 +1135,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
auto callback =
DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that
could not use
// a raw pointer in closure. Has to use the context's shared ptr.
- auto closure =
- SyncSizeClosure::create_unique(request, callback, _dependency,
_wrapper->_context);
+ auto closure = SyncSizeClosure::create_unique(
+ request, callback, _dependency, _wrapper->_context,
+ state->query_options().ignore_runtime_filter_error ?
std::weak_ptr<QueryContext> {}
+ :
state->get_query_ctx_weak());
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
@@ -1157,7 +1161,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
return Status::OK();
}
-Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
+Status IRuntimeFilter::push_to_remote(RuntimeState* state, const
TNetworkAddress* addr) {
DCHECK(is_producer());
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
@@ -1170,7 +1174,10 @@ Status IRuntimeFilter::push_to_remote(const
TNetworkAddress* addr) {
auto merge_filter_callback =
DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest,
DummyBrpcCallback<PMergeFilterResponse>>::
- create_unique(merge_filter_request, merge_filter_callback);
+ create_unique(merge_filter_request, merge_filter_callback,
+
state->query_options().ignore_runtime_filter_error
+ ? std::weak_ptr<QueryContext> {}
+ : state->get_query_ctx_weak());
void* data = nullptr;
int len = 0;
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index f5a069d9e55..84a7f36c8a8 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -225,7 +225,7 @@ public:
// publish filter
// push filter to remote node or push down it to scan_node
- Status publish(bool publish_local = false);
+ Status publish(RuntimeState* state, bool publish_local = false);
Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);
@@ -293,7 +293,7 @@ public:
bool need_sync_filter_size();
// async push runtimefilter to remote node
- Status push_to_remote(const TNetworkAddress* addr);
+ Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);
void init_profile(RuntimeProfile* parent_profile);
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 42c5f598633..3c18735e4e8 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -149,10 +149,10 @@ public:
}
// publish runtime filter
- Status publish(bool publish_local) {
+ Status publish(RuntimeState* state, bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
- RETURN_IF_ERROR(filter->publish(publish_local));
+ RETURN_IF_ERROR(filter->publish(state, publish_local));
}
}
return Status::OK();
diff --git a/be/src/exprs/runtime_filter_slots_cross.h
b/be/src/exprs/runtime_filter_slots_cross.h
index 01ae21a7599..a49f2928f84 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -72,9 +72,9 @@ public:
return Status::OK();
}
- Status publish() {
+ Status publish(RuntimeState* state) {
for (auto filter : _runtime_filters) {
- RETURN_IF_ERROR(filter->publish());
+ RETURN_IF_ERROR(filter->publish(state));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 37de9ac93d8..74db3a5c06c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -157,7 +157,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
}
}
SCOPED_TIMER(_publish_runtime_filter_timer);
- RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
+ RETURN_IF_ERROR(_runtime_filter_slots->publish(state,
!_should_build_hash_table));
return Base::close(state, exec_status);
}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 83b378e792c..41cd8068dd7 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -43,7 +43,7 @@ struct RuntimeFilterBuild {
}
{
SCOPED_TIMER(_parent->publish_runtime_filter_timer());
- RETURN_IF_ERROR(runtime_filter_slots.publish());
+ RETURN_IF_ERROR(runtime_filter_slots.publish(state));
}
return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 2896190f606..c4f633d84aa 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1271,7 +1271,7 @@ Status FragmentMgr::send_filter_size(const
PSendFilterSizeRequest* request) {
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
- auto merge_status = filter_controller->send_filter_size(request);
+ auto merge_status = filter_controller->send_filter_size(query_ctx,
request);
return merge_status;
}
@@ -1313,7 +1313,7 @@ Status FragmentMgr::merge_filter(const
PMergeFilterRequest* request,
SCOPED_ATTACH_TASK(query_ctx.get());
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
- auto merge_status = filter_controller->merge(request, attach_data);
+ auto merge_status = filter_controller->merge(query_ctx, request,
attach_data);
return merge_status;
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 47465530405..d557245bf23 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -165,6 +165,12 @@ public:
return _query_options.__isset.fe_process_uuid ?
_query_options.fe_process_uuid : 0;
}
+ bool ignore_runtime_filter_error() const {
+ return _query_options.__isset.ignore_runtime_filter_error
+ ? _query_options.ignore_runtime_filter_error
+ : false;
+ }
+
// global runtime filter mgr, the runtime filter have remote target or
// need local merge should regist here. before publish() or
push_to_remote()
// the runtime filter should do the local merge work
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 1a238787207..4b4f4880123 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -305,7 +305,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId
query_id,
return Status::OK();
}
-Status RuntimeFilterMergeControllerEntity::send_filter_size(const
PSendFilterSizeRequest* request) {
+Status
RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr<QueryContext>
query_ctx,
+ const
PSendFilterSizeRequest* request) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
@@ -326,6 +327,8 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
Status st = Status::OK();
if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
+ auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ?
std::weak_ptr<QueryContext> {}
+ : query_ctx;
for (auto addr : cnt_val->source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
@@ -339,7 +342,7 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
create_unique(std::make_shared<PSyncFilterSizeRequest>(),
-
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
+
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);
auto* pquery_id = closure->request_->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
@@ -377,7 +380,8 @@ Status RuntimeFilterMgr::sync_filter_size(const
PSyncFilterSizeRequest* request)
}
// merge data
-Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest*
request,
+Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext>
query_ctx,
+ const PMergeFilterRequest*
request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
@@ -444,12 +448,14 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
has_attachment = true;
}
+ auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ?
std::weak_ptr<QueryContext> {}
+ : query_ctx;
std::vector<TRuntimeFilterTargetParamsV2>& targets =
cnt_val->targetv2_info;
for (auto& target : targets) {
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
DummyBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
-
DummyBrpcCallback<PPublishFilterResponse>::create_shared());
+
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);
closure->request_->set_filter_id(request->filter_id());
closure->request_->set_merge_time(merge_time);
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index b0aea7568cf..bac61d6248a 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -156,10 +156,11 @@ public:
const TQueryOptions& query_options);
// handle merge rpc
- Status merge(const PMergeFilterRequest* request,
+ Status merge(std::weak_ptr<QueryContext> query_ctx, const
PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);
- Status send_filter_size(const PSendFilterSizeRequest* request);
+ Status send_filter_size(std::weak_ptr<QueryContext> query_ctx,
+ const PSendFilterSizeRequest* request);
UniqueId query_id() const { return _query_id; }
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index e3f9d075c8f..38522f49dc3 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -40,6 +40,7 @@
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
@@ -129,7 +130,6 @@
RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
: _profile("Fragment " + print_id(instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
- _runtime_filter_mgr(nullptr),
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
@@ -294,6 +294,10 @@ Status RuntimeState::init(const TUniqueId&
fragment_instance_id, const TQueryOpt
return Status::OK();
}
+std::weak_ptr<QueryContext> RuntimeState::get_query_ctx_weak() {
+ return
_exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id());
+}
+
void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId&
id) {
_query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, fmt::format("{}#Id={}", name,
print_id(id)));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 88deee491d1..73f854896f4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -449,6 +449,8 @@ public:
QueryContext* get_query_ctx() { return _query_ctx; }
+ std::weak_ptr<QueryContext> get_query_ctx_weak();
+
void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>&
tracker) {
_query_mem_tracker = tracker;
}
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index 92772a82373..560aebb98ee 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -20,7 +20,9 @@
#include <google/protobuf/stubs/common.h>
#include <atomic>
+#include <utility>
+#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
#include "util/ref_count_closure.h"
@@ -79,8 +81,9 @@ class AutoReleaseClosure : public google::protobuf::Closure {
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);
public:
- AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback>
callback)
- : request_(req), callback_(callback) {
+ AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback>
callback,
+ std::weak_ptr<QueryContext> context = {})
+ : request_(req), callback_(callback), context_(std::move(context))
{
this->cntl_ = callback->cntl_;
this->response_ = callback->response_;
}
@@ -113,12 +116,22 @@ public:
protected:
virtual void _process_if_rpc_failed() {
- LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
+ std::string error_msg = "RPC meet failed: " + cntl_->ErrorText();
+ if (auto ctx = context_.lock(); ctx) {
+ ctx->cancel(Status::NetworkError(error_msg));
+ } else {
+ LOG(WARNING) << error_msg;
+ }
}
virtual void _process_if_meet_error_status(const Status& status) {
- // no need to log END_OF_FILE, reduce the unlessful log
- if (!status.is<ErrorCode::END_OF_FILE>()) {
+ if (status.is<ErrorCode::END_OF_FILE>()) {
+ // no need to log END_OF_FILE, reduce the unlessful log
+ return;
+ }
+ if (auto ctx = context_.lock(); ctx) {
+ ctx->cancel(status);
+ } else {
LOG(WARNING) << "RPC meet error status: " << status;
}
}
@@ -136,6 +149,7 @@ private:
// Use a weak ptr to keep the callback, so that the callback can be
deleted if the main
// thread is freed.
Weak callback_;
+ std::weak_ptr<QueryContext> context_;
};
} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8cb2795bbfc..c5274a38647 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1173,6 +1173,10 @@ public class SessionVariable implements Serializable,
Writable {
"Force the sort algorithm of SortNode to be specified" })
public String forceSortAlgorithm = "";
+ @VariableMgr.VarAttr(name = "ignore_runtime_filter_error", needForward =
true, description = { "在rf遇到错误的时候忽略该rf",
+ "Ignore the rf when it encounters an error" })
+ public boolean ignoreRuntimeFilterError = false;
+
@VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE, needForward = true)
private String runtimeFilterMode = "GLOBAL";
@@ -3971,6 +3975,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes);
tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);
+ tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 29fecc27539..392aa8658df 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -356,6 +356,8 @@ struct TQueryOptions {
138: optional i64 orc_tiny_stripe_threshold_bytes = 8388608;
139: optional i64 orc_once_max_read_bytes = 8388608;
140: optional i64 orc_max_merge_distance_bytes = 1048576;
+
+ 141: optional bool ignore_runtime_filter_error = false;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]