This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 168f75ebf52 [Improvement](runtime-filter) enlarge
merge_filter_callback timeout_m… #42602 (#44593)
168f75ebf52 is described below
commit 168f75ebf52b57830dd5d58a0a39a0e67a0192c2
Author: Pxl <[email protected]>
AuthorDate: Tue Nov 26 17:46:43 2024 +0800
[Improvement](runtime-filter) enlarge merge_filter_callback timeout_m…
#42602 (#44593)
…s (#42602)
Sometimes we encounter [E1008]Reached timeout=1000ms, at this time, like
other rf-related rpc requests, the timeout should be set larger.
---
be/src/common/config.cpp | 4 ++--
be/src/common/config.h | 3 ++-
be/src/exprs/runtime_filter.cpp | 13 +++++++++++--
be/src/pipeline/exec/exchange_sink_buffer.cpp | 4 ++--
be/src/runtime/runtime_filter_mgr.cpp | 13 +++++++++++--
be/src/runtime/runtime_state.h | 5 +++++
be/src/vec/sink/vdata_stream_sender.cpp | 2 +-
7 files changed, 34 insertions(+), 10 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fa5fde4ea41..a0949c32684 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -558,7 +558,6 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec,
"1200");
DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
// You can ignore brpc error '[E1011]The server is overcrowded' when writing
data.
DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
-DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
@@ -934,7 +933,8 @@ DEFINE_mInt64(small_column_size_buffer, "100");
// rf will decide whether the next sampling_frequency blocks need to be
filtered based on the filtering rate of the current block.
DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
-
+DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
+DEFINE_mBool(execution_ignore_eovercrowded, "true");
// cooldown task configs
DEFINE_Int32(cooldown_thread_num, "5");
DEFINE_mInt64(generate_cooldown_task_interval_sec, "20");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f72f02c3a15..86a6b11a2ae 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -603,7 +603,6 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
// You can ignore brpc error '[E1011]The server is overcrowded' when writing
data.
DECLARE_mBool(tablet_writer_ignore_eovercrowded);
-DECLARE_mBool(exchange_sink_ignore_eovercrowded);
DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
// Whether to enable stream load record function, the default is false.
// False: disable stream load record
@@ -983,6 +982,8 @@ DECLARE_mInt64(big_column_size_buffer);
DECLARE_mInt64(small_column_size_buffer);
DECLARE_mInt32(runtime_filter_sampling_frequency);
+DECLARE_mInt32(execution_max_rpc_timeout_sec);
+DECLARE_mBool(execution_ignore_eovercrowded);
// cooldown task configs
DECLARE_Int32(cooldown_thread_num);
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 53c0cc45e40..78d13aac279 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1158,7 +1158,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
request->set_filter_size(local_filter_size);
request->set_filter_id(_filter_id);
- callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout())
* 1000);
+
+
callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout()));
+ if (config::execution_ignore_eovercrowded) {
+ callback->cntl_->ignore_eovercrowded();
+ }
stub->send_filter_size(closure->cntl_.get(), closure->request_.get(),
closure->response_.get(),
closure.get());
@@ -1196,7 +1200,12 @@ Status IRuntimeFilter::push_to_remote(const
TNetworkAddress* addr, uint64_t loca
merge_filter_request->set_local_merge_time(local_merge_time);
auto column_type = _wrapper->column_type();
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
-
merge_filter_callback->cntl_->set_timeout_ms(_state->get_query_ctx()->execution_timeout());
+
+ merge_filter_callback->cntl_->set_timeout_ms(
+
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
+ if (config::execution_ignore_eovercrowded) {
+ merge_filter_callback->cntl_->ignore_eovercrowded();
+ }
if (get_ignored()) {
merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 06971094c5e..1753991fe52 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -232,7 +232,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto send_callback = request.channel->get_send_callback(id,
request.eos);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
- if (config::exchange_sink_ignore_eovercrowded) {
+ if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
@@ -309,7 +309,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
auto send_callback = request.channel->get_send_callback(id,
request.eos);
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
- if (config::exchange_sink_ignore_eovercrowded) {
+ if (config::execution_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
}
send_callback->addFailedHandler([&, weak_task_ctx =
weak_task_exec_ctx()](
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 42eacf3a924..7a6b9eb988d 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -29,6 +29,7 @@
#include <string>
#include <utility>
+#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "exprs/bloom_filter_func.h"
@@ -344,7 +345,10 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
pquery_id->set_hi(_state->get_query_ctx()->query_id().hi);
pquery_id->set_lo(_state->get_query_ctx()->query_id().lo);
closure->cntl_->set_timeout_ms(
- std::min(3600,
_state->get_query_ctx()->execution_timeout()) * 1000);
+
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
+ if (config::execution_ignore_eovercrowded) {
+ closure->cntl_->ignore_eovercrowded();
+ }
closure->request_->set_filter_id(filter_id);
closure->request_->set_filter_size(cnt_val->global_size);
@@ -453,8 +457,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
if (has_attachment) {
closure->cntl_->request_attachment().append(request_attachment);
}
+
closure->cntl_->set_timeout_ms(
- std::min(3600,
_state->get_query_ctx()->execution_timeout()) * 1000);
+
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
+ if (config::execution_ignore_eovercrowded) {
+ closure->cntl_->ignore_eovercrowded();
+ }
+
// set fragment-id
if (target.__isset.target_fragment_ids) {
for (auto& target_fragment_id : target.target_fragment_ids) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 782008ec075..9955bbd4d36 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -38,6 +38,7 @@
#include "agent/be_exec_version_manager.h"
#include "cctz/time_zone.h"
#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/integral_types.h"
@@ -51,6 +52,10 @@
namespace doris {
class IRuntimeFilter;
+inline int32_t get_execution_rpc_timeout_ms(int32_t execution_timeout_sec) {
+ return std::min(config::execution_max_rpc_timeout_sec,
execution_timeout_sec) * 1000;
+}
+
namespace pipeline {
class PipelineXLocalStateBase;
class PipelineXSinkLocalStateBase;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index fc11d254061..d5bd2167354 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -96,7 +96,7 @@ Status Channel::open(RuntimeState* state) {
}
_be_number = state->be_number();
- _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
+ _brpc_timeout_ms =
get_execution_rpc_timeout_ms(state->execution_timeout());
_serializer.set_is_local(_is_local);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]