This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 72a08d51374 [minor](retry) Increase rpc retries in query callback
(#49635)
72a08d51374 is described below
commit 72a08d51374b1400cf0b842fdb7b5ae101114318
Author: Gabriel <[email protected]>
AuthorDate: Mon Mar 31 10:16:12 2025 +0800
[minor](retry) Increase rpc retries in query callback (#49635)
When query is finished, it will report its state to FE by RPC. But if it
failed due to network, this query will be considered a failure. This PR
introduces a retry for this reporting in callback.
---
be/src/runtime/fragment_mgr.cpp | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 876d221284a..48078411ae7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -371,10 +371,19 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
// External query (flink/spark read tablets) not need to report to FE.
return;
}
+ int callback_retries = 10;
+ const int sleep_ms = 1000;
Status exec_status = req.status;
Status coord_status;
- FrontendServiceConnection coord(_exec_env->frontend_client_cache(),
req.coord_addr,
- &coord_status);
+ std::unique_ptr<FrontendServiceConnection> coord = nullptr;
+ do {
+ coord =
std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
+ req.coord_addr,
&coord_status);
+ if (!coord_status.ok()) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+ }
+ } while (!coord_status.ok() && callback_retries-- > 0);
+
if (!coord_status.ok()) {
std::stringstream ss;
UniqueId uid(req.query_id.hi, req.query_id.lo);
@@ -570,21 +579,21 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
}
try {
try {
- coord->reportExecStatus(res, params);
+ (*coord)->reportExecStatus(res, params);
} catch ([[maybe_unused]] TTransportException& e) {
#ifndef ADDRESS_SANITIZER
LOG(WARNING) << "Retrying ReportExecStatus. query id: " <<
print_id(req.query_id)
<< ", instance id: " <<
print_id(req.fragment_instance_id) << " to "
<< req.coord_addr << ", err: " << e.what();
#endif
- rpc_status = coord.reopen();
+ rpc_status = coord->reopen();
if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
req.cancel_fn(rpc_status);
return;
}
- coord->reportExecStatus(res, params);
+ (*coord)->reportExecStatus(res, params);
}
rpc_status = Status::create<false>(res.status);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]