This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8c59e16f811 [opt](query cancel) optimization for query cancel #28778
8c59e16f811 is described below
commit 8c59e16f811cd1266f4ea7e41bf8ddcfaea38574
Author: zhiqiang <[email protected]>
AuthorDate: Fri Dec 22 12:48:37 2023 +0800
[opt](query cancel) optimization for query cancel #28778
---
be/src/agent/heartbeat_server.cpp | 5 +++++
be/src/runtime/exec_env.cpp | 37 +++++++++++++++----------------------
be/src/runtime/exec_env.h | 1 +
be/src/runtime/fragment_mgr.cpp | 13 ++++++++++++-
be/src/runtime/query_context.h | 4 +++-
be/src/util/debug_util.cpp | 1 -
6 files changed, 36 insertions(+), 25 deletions(-)
diff --git a/be/src/agent/heartbeat_server.cpp
b/be/src/agent/heartbeat_server.cpp
index a0714ac2c5c..b7efbe6796f 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -220,6 +220,11 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
if (master_info.__isset.frontend_infos) {
ExecEnv::GetInstance()->update_frontends(master_info.frontend_infos);
+ } else {
+ LOG_EVERY_N(WARNING, 2) << fmt::format(
+ "Heartbeat from {}:{} does not have frontend_infos, this may
because we are "
+ "upgrading cluster",
+ master_info.network_address.hostname,
master_info.network_address.port);
}
if (need_report) {
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 7115dd76f13..71aee1c5b47 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -18,11 +18,13 @@
#include "runtime/exec_env.h"
#include <gen_cpp/HeartbeatService_types.h>
+#include <glog/logging.h>
#include <mutex>
#include <utility>
#include "common/config.h"
+#include "common/logging.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
@@ -119,34 +121,25 @@ std::map<TNetworkAddress, FrontendInfo>
ExecEnv::get_running_frontends() {
const auto now = GetCurrentTimeMicros() / 1000;
for (const auto& pair : _frontends) {
- if (pair.second.info.process_uuid != 0) {
- if (now - pair.second.last_reveiving_time_ms < expired_duration) {
+ auto& brpc_addr = pair.first;
+ auto& fe_info = pair.second;
+
+ if (fe_info.info.process_uuid == 0) {
+ // FE is in an unknown state, regart it as alive. conservative
+ res[brpc_addr] = fe_info;
+ } else {
+ if (now - fe_info.last_reveiving_time_ms < expired_duration) {
// If fe info has just been update in last expired_duration,
regard it as running.
- res[pair.first] = pair.second;
+ res[brpc_addr] = fe_info;
} else {
// Fe info has not been udpate for more than expired_duration,
regard it as an abnormal.
// Abnormal means this fe can not connect to master, and it is
not dropped from cluster.
// or fe do not have master yet.
- LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
- << " has not update its hb "
- << "for more than " <<
config::fe_expire_duration_seconds
- << " secs, regard it as abnormal.";
+ LOG_EVERY_N(WARNING, 50) << fmt::format(
+ "Frontend {}:{} has not update its hb for more than {}
secs, regard it as "
+ "abnormal",
+ brpc_addr.hostname, brpc_addr.port,
config::fe_expire_duration_seconds);
}
-
- continue;
- }
-
- if (pair.second.last_reveiving_time_ms -
pair.second.first_receiving_time_ms >
- expired_duration) {
- // A zero process-uuid that sustains more than 60 seconds(default).
- // We will regard this fe as a abnormal frontend.
- LOG(INFO) << "Frontend " << PrintFrontendInfo(pair.second.info)
- << " has not update its hb "
- << "for more than " << config::fe_expire_duration_seconds
- << " secs, regard it as abnormal.";
- continue;
- } else {
- res[pair.first] = pair.second;
}
}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 95c4e97e1b3..4f4325a79c7 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -349,6 +349,7 @@ private:
std::shared_ptr<WalManager> _wal_manager;
std::mutex _frontends_lock;
+ // ip:brpc_port -> frontend_indo
std::map<TNetworkAddress, FrontendInfo> _frontends;
GroupCommitMgr* _group_commit_mgr = nullptr;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c54786e5428..65e1ba475ae 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1167,6 +1167,8 @@ void FragmentMgr::cancel_worker() {
} else {
for (const auto& q : _query_ctx_map) {
if (q.second->get_fe_process_uuid() == 0) {
+ // zero means this query is from a older version fe or
+ // this fe is starting
continue;
}
@@ -1175,7 +1177,16 @@ void FragmentMgr::cancel_worker() {
if (q.second->get_fe_process_uuid() ==
itr->second.info.process_uuid ||
itr->second.info.process_uuid == 0) {
continue;
+ } else {
+ LOG_WARNING("Coordinator of query {} restarted,
going to cancel it.",
+ print_id(q.second->query_id()));
}
+ } else {
+ LOG_WARNING(
+ "Could not find target coordinator {}:{} of
query {}, going to "
+ "cancel it.",
+ q.second->coord_addr.hostname,
q.second->coord_addr.port,
+ print_id(q.second->query_id()));
}
// Coorninator of this query has already dead.
@@ -1195,7 +1206,7 @@ void FragmentMgr::cancel_worker() {
if (!queries_to_cancel.empty()) {
LOG(INFO) << "There are " << queries_to_cancel.size()
- << " queries need to be cancelled, coordinator dead.";
+ << " queries need to be cancelled, coordinator dead or
restarted.";
}
for (const auto& qid : queries_to_cancel) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 6d392c56175..a230fd653e8 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -184,7 +184,9 @@ public:
return _query_options.be_exec_version;
}
- [[nodiscard]] int64_t get_fe_process_uuid() const { return
_query_options.fe_process_uuid; }
+ [[nodiscard]] int64_t get_fe_process_uuid() const {
+ return _query_options.__isset.fe_process_uuid ?
_query_options.fe_process_uuid : 0;
+ }
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get();
}
diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp
index 85f2d482946..37243a6935b 100644
--- a/be/src/util/debug_util.cpp
+++ b/be/src/util/debug_util.cpp
@@ -125,7 +125,6 @@ std::string PrintFrontendInfos(const
std::vector<TFrontendInfo>& fe_infos) {
std::string PrintFrontendInfo(const TFrontendInfo& fe_info) {
std::stringstream ss;
fe_info.printTo(ss);
-
return ss.str();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]