This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 035027f8315 [fix](query cancel) Fix query is cancelled when it comes
from follower FE #37662 (#37707)
035027f8315 is described below
commit 035027f8315ff26a1c5168e71f43fb26f14edc68
Author: zhiqiang <[email protected]>
AuthorDate: Fri Jul 12 15:50:45 2024 +0800
[fix](query cancel) Fix query is cancelled when it comes from follower FE
#37662 (#37707)
cherry pick from #37662
---
be/src/runtime/fragment_mgr.cpp | 42 +++++++++++++++++++++++++++++++++++------
1 file changed, 36 insertions(+), 6 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index fee9d51afba..c2f16e1d05a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -37,6 +37,7 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TTransportException.h>
+#include <algorithm>
#include <atomic>
#include "common/status.h"
@@ -1173,21 +1174,50 @@ void FragmentMgr::cancel_worker() {
continue;
}
- auto itr = running_fes.find(q.second->coord_addr);
+ auto query_context = q.second;
+
+ auto itr = running_fes.find(query_context->coord_addr);
if (itr != running_fes.end()) {
- if (q.second->get_fe_process_uuid() ==
itr->second.info.process_uuid ||
+ if (query_context->get_fe_process_uuid() ==
itr->second.info.process_uuid ||
itr->second.info.process_uuid == 0) {
continue;
} else {
- LOG_WARNING("Coordinator of query {} restarted,
going to cancel it.",
- print_id(q.second->query_id()));
+ // In some rear cases, the rpc port of follower is
not updated in time,
+ // then the port of this follower will be zero,
but acutally it is still running,
+ // and be has already received the query from
follower.
+ // So we need to check if host is in running_fes.
+ bool fe_host_is_standing = std::any_of(
+ running_fes.begin(), running_fes.end(),
+ [query_context](const auto& fe) {
+ return fe.first.hostname ==
+
query_context->coord_addr.hostname &&
+ fe.first.port == 0;
+ });
+ if (fe_host_is_standing) {
+ LOG_WARNING(
+ "Coordinator {}:{} is not found, but
its host is still "
+ "running with an unstable brpc port,
not going to cancel "
+ "it.",
+ query_context->coord_addr.hostname,
+ query_context->coord_addr.port,
+ print_id(query_context->query_id()));
+ continue;
+ } else {
+ LOG_WARNING(
+ "Could not find target coordinator
{}:{} of query {}, "
+ "going to "
+ "cancel it.",
+ query_context->coord_addr.hostname,
+ query_context->coord_addr.port,
+ print_id(query_context->query_id()));
+ }
}
} else {
LOG_WARNING(
"Could not find target coordinator {}:{} of
query {}, going to "
"cancel it.",
- q.second->coord_addr.hostname,
q.second->coord_addr.port,
- print_id(q.second->query_id()));
+ query_context->coord_addr.hostname,
query_context->coord_addr.port,
+ print_id(query_context->query_id()));
}
// Coorninator of this query has already dead.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]