This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch cp_0703_2 in repository https://gitbox.apache.org/repos/asf/doris.git
commit d9a46a5ec909c48011222d79d2edbdd3b2ff6951 Author: Pxl <[email protected]> AuthorDate: Tue Jul 2 14:16:39 2024 +0800 [Chore](runtime-filter) enlarge sync filter size rpc timeout limit (#37103) ## Proposed changes enlarge sync filter size rpc timeout limit rf will failed when rpc timeout, so we need enlarge limit ``` sync filter size meet error, filter: RuntimeFilter: (id = 3, type = in_or_bloomfilter, need_local_merge: false, is_broadcast: false, build_bf_cardinality: true ``` --- be/src/common/config.cpp | 1 + be/src/exprs/runtime_filter.cpp | 4 ++-- be/src/exprs/runtime_filter.h | 2 +- be/src/exprs/runtime_filter_slots.h | 2 +- regression-test/suites/query_p0/join/test_join5.groovy | 1 + 5 files changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index fe811165c17..f276487d152 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -246,6 +246,7 @@ DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400"); // default thrift client connect timeout(in seconds) DEFINE_mInt32(thrift_connect_timeout_seconds, "3"); DEFINE_mInt32(fetch_rpc_timeout_seconds, "30"); + // default thrift client retry interval (in milliseconds) DEFINE_mInt64(thrift_client_retry_interval_ms, "1000"); // max message size of thrift request diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 1271ec39156..3f8a19f1b16 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1054,7 +1054,7 @@ public: : Base(req, callback), _dependency(std::move(dependency)) {} }; -Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { +Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { DCHECK(is_producer()); if (_need_local_merge) { @@ -1105,7 +1105,7 @@ Status IRuntimeFilter::send_filter_size(uint64_t local_filter_size) { request->set_filter_size(local_filter_size); request->set_filter_id(_filter_id); - callback->cntl_->set_timeout_ms(wait_time_ms()); + callback->cntl_->set_timeout_ms(std::min(3600, state->execution_timeout()) * 1000); stub->send_filter_size(closure->cntl_.get(), closure->request_.get(), closure->response_.get(), closure.get()); diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index ee6897be322..e8c5bbfd872 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -230,7 +230,7 @@ public: // push filter to remote node or push down it to scan_node Status publish(bool publish_local = false); - Status send_filter_size(uint64_t local_filter_size); + Status send_filter_size(RuntimeState* state, uint64_t local_filter_size); RuntimeFilterType type() const { return _runtime_filter_type; } diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index b5b04a1ebac..0bf8a33f9f2 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -55,7 +55,7 @@ public: // send_filter_size may call dependency->sub(), so we call set_dependency firstly for all rf to avoid dependency set_ready repeatedly for (auto* runtime_filter : _runtime_filters) { if (runtime_filter->need_sync_filter_size()) { - RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size)); + RETURN_IF_ERROR(runtime_filter->send_filter_size(state, hash_table_size)); } } return Status::OK(); diff --git a/regression-test/suites/query_p0/join/test_join5.groovy b/regression-test/suites/query_p0/join/test_join5.groovy index 62be496372d..4323575870f 100644 --- a/regression-test/suites/query_p0/join/test_join5.groovy +++ b/regression-test/suites/query_p0/join/test_join5.groovy @@ -16,6 +16,7 @@ // under the License. suite("test_join5", "query,p0") { + sql "set runtime_filter_wait_time_ms = 5" def DBname = "regression_test_join5" sql "DROP DATABASE IF EXISTS ${DBname}" sql "CREATE DATABASE IF NOT EXISTS ${DBname}" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
