This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch cp_1126_3
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2f8b9bdb21d8bdbf5de6f7c0a6307d2a115b27eb
Author: Pxl <[email protected]>
AuthorDate: Thu Oct 31 14:34:10 2024 +0800

    [Improvement](runtime-filter) enlarge merge_filter_callback timeout_ms 
(#42602)
    
    Sometimes we encounter [E1008]Reached timeout=1000ms, at this time, like
    other rf-related rpc requests, the timeout should be set larger.
---
 be/src/common/config.cpp                      |  4 ++--
 be/src/common/config.h                        |  3 ++-
 be/src/exprs/runtime_filter.cpp               | 18 +++++++++++++-----
 be/src/pipeline/exec/exchange_sink_buffer.cpp |  4 ++--
 be/src/runtime/runtime_filter_mgr.cpp         | 17 ++++++++++++++++-
 be/src/runtime/runtime_state.h                |  5 +++++
 be/src/vec/sink/vdata_stream_sender.cpp       |  2 +-
 7 files changed, 41 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c41f19a3e27..e6d68b50eb6 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -571,7 +571,6 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, 
"1200");
 DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
-DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
 DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -960,7 +959,8 @@ DEFINE_mInt64(small_column_size_buffer, "100");
 // When the rows number reached this limit, will check the filter rate the of 
bloomfilter
 // if it is lower than a specific threshold, the predicate will be disabled.
 DEFINE_mInt32(rf_predicate_check_row_num, "204800");
-
+DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
+DEFINE_mBool(execution_ignore_eovercrowded, "true");
 // cooldown task configs
 DEFINE_Int32(cooldown_thread_num, "5");
 DEFINE_mInt64(generate_cooldown_task_interval_sec, "20");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7693af0f7ae..a0b2116de81 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -617,7 +617,6 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
 DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DECLARE_mBool(tablet_writer_ignore_eovercrowded);
-DECLARE_mBool(exchange_sink_ignore_eovercrowded);
 DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -1010,6 +1009,8 @@ DECLARE_mInt64(small_column_size_buffer);
 // When the rows number reached this limit, will check the filter rate the of 
bloomfilter
 // if it is lower than a specific threshold, the predicate will be disabled.
 DECLARE_mInt32(rf_predicate_check_row_num);
+DECLARE_mInt32(execution_max_rpc_timeout_sec);
+DECLARE_mBool(execution_ignore_eovercrowded);
 
 // cooldown task configs
 DECLARE_Int32(cooldown_thread_num);
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index b2817e291ca..8252d4de80b 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1123,8 +1123,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
 
     request->set_filter_size(local_filter_size);
     request->set_filter_id(_filter_id);
-    callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) 
* 1000);
-    callback->cntl_->ignore_eovercrowded();
+
+    
callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout()));
+    if (config::execution_ignore_eovercrowded) {
+        callback->cntl_->ignore_eovercrowded();
+    }
 
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
@@ -1161,9 +1164,14 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, bool opt_remo
     merge_filter_request->set_opt_remote_rf(opt_remote_rf);
     merge_filter_request->set_is_pipeline(_state->enable_pipeline_exec);
     auto column_type = _wrapper->column_type();
-    merge_filter_request->set_column_type(to_proto(column_type));
-    merge_filter_callback->cntl_->set_timeout_ms(wait_time_ms());
-    merge_filter_callback->cntl_->ignore_eovercrowded();
+
+    
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
+
+    merge_filter_callback->cntl_->set_timeout_ms(
+            get_execution_rpc_timeout_ms(_state->execution_timeout));
+    if (config::execution_ignore_eovercrowded) {
+        merge_filter_callback->cntl_->ignore_eovercrowded();
+    }
 
     if (get_ignored()) {
         merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index af66eedc1e0..58fb5581b8c 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -274,7 +274,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         _instance_to_rpc_ctx[id].is_cancelled = false;
 
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
-        if (config::exchange_sink_ignore_eovercrowded) {
+        if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
@@ -355,7 +355,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId 
id) {
         _instance_to_rpc_ctx[id] = rpc_ctx;
 
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
-        if (config::exchange_sink_ignore_eovercrowded) {
+        if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 640cece8fb3..3bee77e06c4 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -30,6 +30,7 @@
 #include <string>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "exprs/bloom_filter_func.h"
@@ -364,7 +365,10 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
             auto* pquery_id = closure->request_->mutable_query_id();
             pquery_id->set_hi(_state->query_id.hi());
             pquery_id->set_lo(_state->query_id.lo());
-            closure->cntl_->set_timeout_ms(std::min(3600, 
_state->execution_timeout) * 1000);
+            
closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout));
+            if (config::execution_ignore_eovercrowded) {
+                closure->cntl_->ignore_eovercrowded();
+            }
 
             closure->request_->set_filter_id(filter_id);
             closure->request_->set_filter_size(cnt_val->global_size);
@@ -515,6 +519,17 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
                 if (auto status = 
Status::create(rpc_context->response.status()); !status) {
                     return status;
                 }
+            }
+            
closure->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(_state->execution_timeout));
+            if (config::execution_ignore_eovercrowded) {
+                closure->cntl_->ignore_eovercrowded();
+            }
+
+            // set fragment-id
+            if (target.__isset.target_fragment_ids) {
+                for (auto& target_fragment_id : target.target_fragment_ids) {
+                    closure->request_->add_fragment_ids(target_fragment_id);
+                }
                 if (rpc_context->cntl.Failed()) {
                     LOG(WARNING) << "runtimefilter rpc err:" << 
rpc_context->cntl.ErrorText();
                     
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index c9813108267..3cba160cef4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -36,6 +36,7 @@
 
 #include "cctz/time_zone.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "gutil/integral_types.h"
@@ -47,6 +48,10 @@
 namespace doris {
 class IRuntimeFilter;
 
+inline int32_t get_execution_rpc_timeout_ms(int32_t execution_timeout_sec) {
+    return std::min(config::execution_max_rpc_timeout_sec, 
execution_timeout_sec) * 1000;
+}
+
 namespace pipeline {
 class PipelineXLocalStateBase;
 class PipelineXSinkLocalStateBase;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index f4efa3aec02..a82c92a1279 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -184,7 +184,7 @@ Status Channel<Parent>::open(RuntimeState* state) {
     _brpc_request->set_sender_id(_parent->sender_id());
     _brpc_request->set_be_number(_be_number);
 
-    _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
+    _brpc_timeout_ms = 
get_execution_rpc_timeout_ms(state->execution_timeout());
 
     _serializer.set_is_local(_is_local);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to