This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 168f75ebf52 [Improvement](runtime-filter) enlarge 
merge_filter_callback timeout_m… #42602 (#44593)
168f75ebf52 is described below

commit 168f75ebf52b57830dd5d58a0a39a0e67a0192c2
Author: Pxl <[email protected]>
AuthorDate: Tue Nov 26 17:46:43 2024 +0800

    [Improvement](runtime-filter) enlarge merge_filter_callback timeout_m… 
#42602 (#44593)
    
    …s (#42602)
    
    Sometimes we encounter [E1008]Reached timeout=1000ms, at this time, like
    other rf-related rpc requests, the timeout should be set larger.
---
 be/src/common/config.cpp                      |  4 ++--
 be/src/common/config.h                        |  3 ++-
 be/src/exprs/runtime_filter.cpp               | 13 +++++++++++--
 be/src/pipeline/exec/exchange_sink_buffer.cpp |  4 ++--
 be/src/runtime/runtime_filter_mgr.cpp         | 13 +++++++++++--
 be/src/runtime/runtime_state.h                |  5 +++++
 be/src/vec/sink/vdata_stream_sender.cpp       |  2 +-
 7 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fa5fde4ea41..a0949c32684 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -558,7 +558,6 @@ DEFINE_mInt32(streaming_load_rpc_max_alive_time_sec, 
"1200");
 DEFINE_Int32(tablet_writer_open_rpc_timeout_sec, "60");
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DEFINE_mBool(tablet_writer_ignore_eovercrowded, "true");
-DEFINE_mBool(exchange_sink_ignore_eovercrowded, "true");
 DEFINE_mInt32(slave_replica_writer_rpc_timeout_sec, "60");
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -934,7 +933,8 @@ DEFINE_mInt64(small_column_size_buffer, "100");
 
 // rf will decide whether the next sampling_frequency blocks need to be 
filtered based on the filtering rate of the current block.
 DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
-
+DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
+DEFINE_mBool(execution_ignore_eovercrowded, "true");
 // cooldown task configs
 DEFINE_Int32(cooldown_thread_num, "5");
 DEFINE_mInt64(generate_cooldown_task_interval_sec, "20");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index f72f02c3a15..86a6b11a2ae 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -603,7 +603,6 @@ DECLARE_mInt32(streaming_load_rpc_max_alive_time_sec);
 DECLARE_Int32(tablet_writer_open_rpc_timeout_sec);
 // You can ignore brpc error '[E1011]The server is overcrowded' when writing 
data.
 DECLARE_mBool(tablet_writer_ignore_eovercrowded);
-DECLARE_mBool(exchange_sink_ignore_eovercrowded);
 DECLARE_mInt32(slave_replica_writer_rpc_timeout_sec);
 // Whether to enable stream load record function, the default is false.
 // False: disable stream load record
@@ -983,6 +982,8 @@ DECLARE_mInt64(big_column_size_buffer);
 DECLARE_mInt64(small_column_size_buffer);
 
 DECLARE_mInt32(runtime_filter_sampling_frequency);
+DECLARE_mInt32(execution_max_rpc_timeout_sec);
+DECLARE_mBool(execution_ignore_eovercrowded);
 
 // cooldown task configs
 DECLARE_Int32(cooldown_thread_num);
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 53c0cc45e40..78d13aac279 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1158,7 +1158,11 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* 
state, uint64_t local_filt
 
     request->set_filter_size(local_filter_size);
     request->set_filter_id(_filter_id);
-    callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) 
* 1000);
+
+    
callback->cntl_->set_timeout_ms(get_execution_rpc_timeout_ms(state->execution_timeout()));
+    if (config::execution_ignore_eovercrowded) {
+        callback->cntl_->ignore_eovercrowded();
+    }
 
     stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), 
closure->response_.get(),
                            closure.get());
@@ -1196,7 +1200,12 @@ Status IRuntimeFilter::push_to_remote(const 
TNetworkAddress* addr, uint64_t loca
     merge_filter_request->set_local_merge_time(local_merge_time);
     auto column_type = _wrapper->column_type();
     
RETURN_IF_CATCH_EXCEPTION(merge_filter_request->set_column_type(to_proto(column_type)));
-    
merge_filter_callback->cntl_->set_timeout_ms(_state->get_query_ctx()->execution_timeout());
+
+    merge_filter_callback->cntl_->set_timeout_ms(
+            
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
+    if (config::execution_ignore_eovercrowded) {
+        merge_filter_callback->cntl_->ignore_eovercrowded();
+    }
 
     if (get_ignored()) {
         merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 06971094c5e..1753991fe52 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -232,7 +232,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
 
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
-        if (config::exchange_sink_ignore_eovercrowded) {
+        if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
@@ -309,7 +309,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
         }
         auto send_callback = request.channel->get_send_callback(id, 
request.eos);
         
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
-        if (config::exchange_sink_ignore_eovercrowded) {
+        if (config::execution_ignore_eovercrowded) {
             send_callback->cntl_->ignore_eovercrowded();
         }
         send_callback->addFailedHandler([&, weak_task_ctx = 
weak_task_exec_ctx()](
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 42eacf3a924..7a6b9eb988d 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -29,6 +29,7 @@
 #include <string>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "exprs/bloom_filter_func.h"
@@ -344,7 +345,10 @@ Status 
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
             pquery_id->set_hi(_state->get_query_ctx()->query_id().hi);
             pquery_id->set_lo(_state->get_query_ctx()->query_id().lo);
             closure->cntl_->set_timeout_ms(
-                    std::min(3600, 
_state->get_query_ctx()->execution_timeout()) * 1000);
+                    
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
+            if (config::execution_ignore_eovercrowded) {
+                closure->cntl_->ignore_eovercrowded();
+            }
 
             closure->request_->set_filter_id(filter_id);
             closure->request_->set_filter_size(cnt_val->global_size);
@@ -453,8 +457,13 @@ Status RuntimeFilterMergeControllerEntity::merge(const 
PMergeFilterRequest* requ
             if (has_attachment) {
                 
closure->cntl_->request_attachment().append(request_attachment);
             }
+
             closure->cntl_->set_timeout_ms(
-                    std::min(3600, 
_state->get_query_ctx()->execution_timeout()) * 1000);
+                    
get_execution_rpc_timeout_ms(_state->get_query_ctx()->execution_timeout()));
+            if (config::execution_ignore_eovercrowded) {
+                closure->cntl_->ignore_eovercrowded();
+            }
+
             // set fragment-id
             if (target.__isset.target_fragment_ids) {
                 for (auto& target_fragment_id : target.target_fragment_ids) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 782008ec075..9955bbd4d36 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -38,6 +38,7 @@
 #include "agent/be_exec_version_manager.h"
 #include "cctz/time_zone.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "gutil/integral_types.h"
@@ -51,6 +52,10 @@
 namespace doris {
 class IRuntimeFilter;
 
+inline int32_t get_execution_rpc_timeout_ms(int32_t execution_timeout_sec) {
+    return std::min(config::execution_max_rpc_timeout_sec, 
execution_timeout_sec) * 1000;
+}
+
 namespace pipeline {
 class PipelineXLocalStateBase;
 class PipelineXSinkLocalStateBase;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index fc11d254061..d5bd2167354 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -96,7 +96,7 @@ Status Channel::open(RuntimeState* state) {
     }
     _be_number = state->be_number();
 
-    _brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
+    _brpc_timeout_ms = 
get_execution_rpc_timeout_ms(state->execution_timeout());
 
     _serializer.set_is_local(_is_local);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to