This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_1126_3 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2f8b9bdb21d8bdbf5de6f7c0a6307d2a115b27eb Author: Pxl <[email protected]> AuthorDate: Thu Oct 31 14:34:10 2024 +0800 [Improvement](runtime-filter) enlarge merge_filter_callback timeout_ms (#42602) Sometimes we encounter [E1008]Reached timeout=1000ms, at this time, like other rf-related rpc requests, the timeout should be set larger. --- be/src/common/config.cpp | 4 ++-- be/src/common/config.h | 3 ++- be/src/exprs/runtime_filter.cpp | 18 +++++++++++++----- be/src/pipeline/exec/exchange_sink_buffer.cpp | 4 ++-- be/src/runtime/runtime_filter_mgr.cpp | 17 ++++++++++++++++- be/src/runtime/runtime_state.h | 5 +++++ be/src/vec/sink/vdata_stream_sender.cpp | 2 +- 7 files changed, 41 insertions(+), 12 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c41f19a3e27..e6d68b50eb6 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -571,7 +571,6 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, "1200"); DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60"); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true"); -DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true"); DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60"); // Whether to enable stream load record function, the default is false. // False: disable stream load record @@ -960,7 +959,8 @@ DEFINE_mInt64(small_column_size_buffer, "100"); // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. DEFINE_mInt32(rf_predicate_check_row_num, "204800"); - +DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600"); +DEFINE_mBool(execution_ignore_eovercrowded, "true"); // cooldown task configs DEFINE_Int32(cooldown_thread_num, "5"); DEFINE_mInt64(generate_cooldown_task_interval_sec, "20"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 7693af0f7ae..a0b2116de81 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -617,7 +617,6 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec); DECLARE_Int32(tablet_writer_open_rpc_timeout_sec); // You can ignore brpc error '[E1011]The server is overcrowded' when writing data. DECLARE_mBool(tablet_writer_ignore_eovercrowded); -DECLARE_mBool(exchange_sink_ignore_eovercrowded); DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec); // Whether to enable stream load record function, the default is false. // False: disable stream load record @@ -1010,6 +1009,8 @@ DECLARE_mInt64(small_column_size_buffer); // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. DECLARE_mInt32(rf_predicate_check_row_num); +DECLARE_mInt32(execution_max_rpc_timeout_sec); +DECLARE_mBool(execution_ignore_eovercrowded); // cooldown task configs DECLARE_Int32(cooldown_thread_num); diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index b2817e291ca..8252d4de80b 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1123,8 +1123,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt request->set_filter_size(local_filter_size); request->set_filter_id(_filter_id); - callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000); - callback->cntl_->ignore_eovercrowded(); + + callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout())); + if (config::execution_ignore_eovercrowded) { + callback->cntl_->ignore_eovercrowded(); + } stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); @@ -1161,9 +1164,14 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr, bool opt_remo merge_filter_request->set_opt_remote_rf(opt_remote_rf); merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec); auto column_type = _wrapper->column_type(); - merge_filter_request->set_column_type(to_proto(column_type)); - merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms()); - merge_filter_callback->cntl_->ignore_eovercrowded(); + + RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type))); + + merge_filter_callback->cntl_->set_timeout_ms( + get_execution_rpc_timeout_ms(_state->execution_timeout)); + if (config::execution_ignore_eovercrowded) { + merge_filter_callback->cntl_->ignore_eovercrowded(); + } if (get_ignored()) { merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index af66eedc1e0..58fb5581b8c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -274,7 +274,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { _instance_to_rpc_ctx[id].is_cancelled = false; send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); - if (config::exchange_sink_ignore_eovercrowded) { + if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( @@ -355,7 +355,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) { _instance_to_rpc_ctx[id] = rpc_ctx; send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms); - if (config::exchange_sink_ignore_eovercrowded) { + if (config::execution_ignore_eovercrowded) { send_callback->cntl_->ignore_eovercrowded(); } send_callback->addFailedHandler([&, weak_task_ctx = weak_task_exec_ctx()]( diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 640cece8fb3..3bee77e06c4 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -30,6 +30,7 @@ #include <string> #include <utility> +#include "common/config.h" #include "common/logging.h" #include "common/status.h" #include "exprs/bloom_filter_func.h" @@ -364,7 +365,10 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); - closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000); + closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout)); + if (config::execution_ignore_eovercrowded) { + closure->cntl_->ignore_eovercrowded(); + } closure->request_->set_filter_id(filter_id); closure->request_->set_filter_size(cnt_val->global_size); @@ -515,6 +519,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ if (auto status = Status::create(rpc_context->response.status()); !status) { return status; } + } + closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout)); + if (config::execution_ignore_eovercrowded) { + closure->cntl_->ignore_eovercrowded(); + } + + // set fragment-id + if (target.__isset.target_fragment_ids) { + for (auto& target_fragment_id : target.target_fragment_ids) { + closure->request_->add_fragment_ids(target_fragment_id); + } if (rpc_context->cntl.Failed()) { LOG(WARNING) << "runtimefilter rpc err:" << rpc_context->cntl.ErrorText(); ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index c9813108267..3cba160cef4 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -36,6 +36,7 @@ #include "cctz/time_zone.h" #include "common/compiler_util.h" // IWYU pragma: keep +#include "common/config.h" #include "common/factory_creator.h" #include "common/status.h" #include "gutil/integral_types.h" @@ -47,6 +48,10 @@ namespace doris { class IRuntimeFilter; +inline int32_t get_execution_rpc_timeout_ms(int32_t execution_timeout_sec) { + return std::min(config::execution_max_rpc_timeout_sec, execution_timeout_sec) * 1000; +} + namespace pipeline { class PipelineXLocalStateBase; class PipelineXSinkLocalStateBase; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index f4efa3aec02..a82c92a1279 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -184,7 +184,7 @@ Status Channel<Parent>::open(RuntimeState* state) { _brpc_request->set_sender_id(_parent->sender_id()); _brpc_request->set_be_number(_be_number); - _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000; + _brpc_timeout_ms = get_execution_rpc_timeout_ms(state->execution_timeout()); _serializer.set_is_local(_is_local); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
