This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 4dfbd34afdf23991ca7ac2ec96f3f6620771fc67 Author: Maxim Smyatkin <[email protected]> AuthorDate: Fri May 17 15:55:27 2024 +0300 [yagp_hooks_collector] Add nested query tracking Track query nesting level using a per-query key (tmid, ssid, ccnt, nesting_level, query_desc_addr). Maintain a state machine per active query to correctly sequence submit→start→end→done across nesting boundaries. --- protos/yagpcc_metrics.proto | 10 ++- protos/yagpcc_set_service.proto | 32 ++++++++-- src/Config.cpp | 7 ++ src/Config.h | 1 + src/EventSender.cpp | 138 ++++++++++++++++++++++++++++++++-------- src/EventSender.h | 26 +++++++- src/hook_wrappers.cpp | 2 +- 7 files changed, 178 insertions(+), 38 deletions(-) diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto index 2d20d3c46d9..68492732ece 100644 --- a/protos/yagpcc_metrics.proto +++ b/protos/yagpcc_metrics.proto @@ -36,6 +36,11 @@ message QueryInfo { string rsgname = 10; } +message AdditionalQueryInfo { + int64 nested_level = 1; + string error_message = 2; +} + enum PlanGenerator { PLAN_GENERATOR_UNSPECIFIED = 0; @@ -95,7 +100,7 @@ message MetricInstrumentation { uint64 nloops = 2; /* # of run cycles for this node */ uint64 tuplecount = 3; /* Tuples emitted so far this cycle */ double firsttuple = 4; /* Time for first tuple of this cycle */ - double startup = 5; /* Total startup time (in seconds) */ + double startup = 5; /* Total startup time (in seconds) (optimiser's cost estimation) */ double total = 6; /* Total total time (in seconds) */ uint64 shared_blks_hit = 7; /* shared blocks stats*/ uint64 shared_blks_read = 8; @@ -105,12 +110,13 @@ message MetricInstrumentation { uint64 local_blks_read = 12; uint64 local_blks_dirtied = 13; uint64 local_blks_written = 14; - uint64 temp_blks_read = 15; /* temporary tables read stat */ + uint64 temp_blks_read = 15; /* temporary tables read stat */ uint64 temp_blks_written = 16; double blk_read_time = 17; /* measured read/write time */ double blk_write_time = 18; NetworkStat sent = 19; NetworkStat received = 20; + double startup_time = 21; /* real query startup time (planning + queue time) */ } message SpillInfo { diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto index e8fc7aaa99d..0b9e34df49d 100644 --- a/protos/yagpcc_set_service.proto +++ b/protos/yagpcc_set_service.proto @@ -9,17 +9,35 @@ package yagpcc; option java_outer_classname = "SegmentYAGPCCAS"; option go_package = "a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/agent_segment;greenplum"; +service SetQueryInfo { + rpc SetMetricPlanNode (SetPlanNodeReq) returns (MetricResponse) {} + + rpc SetMetricQuery (SetQueryReq) returns (MetricResponse) {} +} + +message MetricResponse { + MetricResponseStatusCode error_code = 1; + string error_text = 2; +} + +enum MetricResponseStatusCode { + METRIC_RESPONSE_STATUS_CODE_UNSPECIFIED = 0; + METRIC_RESPONSE_STATUS_CODE_SUCCESS = 1; + METRIC_RESPONSE_STATUS_CODE_ERROR = 2; +} + message SetQueryReq { - QueryStatus query_status = 1; - google.protobuf.Timestamp datetime = 2; - QueryKey query_key = 3; - SegmentKey segment_key = 4; - QueryInfo query_info = 5; - GPMetrics query_metrics = 6; - repeated MetricPlan plan_tree = 7; + QueryStatus query_status = 1; + google.protobuf.Timestamp datetime = 2; + QueryKey query_key = 3; + SegmentKey segment_key = 4; + QueryInfo query_info = 5; + GPMetrics query_metrics = 6; + repeated MetricPlan plan_tree = 7; google.protobuf.Timestamp submit_time = 8; google.protobuf.Timestamp start_time = 9; google.protobuf.Timestamp end_time = 10; + AdditionalQueryInfo add_info = 11; } message SetPlanNodeReq { diff --git a/src/Config.cpp b/src/Config.cpp index c5c2c15f7e9..1bbad9a6ea3 100644 --- a/src/Config.cpp +++ b/src/Config.cpp @@ -13,6 +13,7 @@ static char *guc_uds_path = nullptr; static bool guc_enable_analyze = true; static bool guc_enable_cdbstats = true; static bool guc_enable_collector = true; +static bool guc_report_nested_queries = true; static char *guc_ignored_users = nullptr; static std::unique_ptr<std::unordered_set<std::string>> ignored_users = nullptr; @@ -36,6 +37,11 @@ void Config::init() { &guc_enable_cdbstats, true, PGC_SUSET, GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL); + DefineCustomBoolVariable( + "yagpcc.report_nested_queries", "Collect stats on nested queries", 0LL, + &guc_report_nested_queries, true, PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL); + DefineCustomStringVariable( "yagpcc.ignored_users_list", "Make yagpcc ignore queries issued by given users", 0LL, @@ -47,6 +53,7 @@ std::string Config::uds_path() { return guc_uds_path; } bool Config::enable_analyze() { return guc_enable_analyze; } bool Config::enable_cdbstats() { return guc_enable_cdbstats; } bool Config::enable_collector() { return guc_enable_collector; } +bool Config::report_nested_queries() { return guc_report_nested_queries; } bool Config::filter_user(const std::string *username) { if (!ignored_users) { diff --git a/src/Config.h b/src/Config.h index 999d0300640..15f425be67c 100644 --- a/src/Config.h +++ b/src/Config.h @@ -10,4 +10,5 @@ public: static bool enable_cdbstats(); static bool enable_collector(); static bool filter_user(const std::string *username); + static bool report_nested_queries(); }; \ No newline at end of file diff --git a/src/EventSender.cpp b/src/EventSender.cpp index 21c2e2117a3..116805d0646 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -10,6 +10,7 @@ extern "C" { #include "postgres.h" #include "access/hash.h" +#include "access/xact.h" #include "commands/dbcommands.h" #include "commands/explain.h" #include "commands/resgroupcmds.h" @@ -30,11 +31,6 @@ extern "C" { #include "EventSender.h" -#define need_collect() \ - (nesting_level == 0 && gp_command_count != 0 && \ - query_desc->sourceText != nullptr && Config::enable_collector() && \ - !Config::filter_user(get_user_name())) - namespace { std::string *get_user_name() { @@ -146,6 +142,11 @@ void set_query_info(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { } } +void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level) { + auto aqi = req->mutable_add_info(); + aqi->set_nested_level(nesting_level); +} + void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics, QueryDesc *query_desc) { auto instrument = query_desc->planstate->instrument; @@ -210,6 +211,19 @@ yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc, return req; } +inline bool is_top_level_query(QueryDesc *query_desc, int nesting_level) { + return (query_desc->gpmon_pkt && + query_desc->gpmon_pkt->u.qexec.key.tmid == 0) || + nesting_level == 0; +} + +inline bool need_collect(QueryDesc *query_desc, int nesting_level) { + return (Config::report_nested_queries() || + is_top_level_query(query_desc, nesting_level)) && + gp_command_count != 0 && query_desc->sourceText != nullptr && + Config::enable_collector() && !Config::filter_user(get_user_name()); +} + } // namespace void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { @@ -223,7 +237,8 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { // TODO break; case METRICS_QUERY_SUBMIT: - collect_query_submit(reinterpret_cast<QueryDesc *>(arg)); + // don't collect anything here. We will fake this call in ExecutorStart as + // it really makes no difference. Just complicates things break; case METRICS_QUERY_START: // no-op: executor_after_start is enough @@ -232,10 +247,8 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { case METRICS_QUERY_ERROR: case METRICS_QUERY_CANCELING: case METRICS_QUERY_CANCELED: - collect_query_done(reinterpret_cast<QueryDesc *>(arg), status); - break; case METRICS_INNER_QUERY_DONE: - // TODO + collect_query_done(reinterpret_cast<QueryDesc *>(arg), status); break; default: ereport(FATAL, (errmsg("Unknown query status: %d", status))); @@ -247,9 +260,10 @@ void EventSender::executor_before_start(QueryDesc *query_desc, if (!connector) { return; } - if (!need_collect()) { + if (!need_collect(query_desc, nesting_level)) { return; } + collect_query_submit(query_desc); query_start_time = std::chrono::high_resolution_clock::now(); WorkfileResetBackendStats(); if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) { @@ -273,8 +287,10 @@ void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { return; } if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) && - need_collect()) { - query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START); + need_collect(query_desc, nesting_level)) { + auto *query = get_query_message(query_desc); + update_query_state(query_desc, query, QueryState::START); + auto query_msg = query->message; *query_msg->mutable_start_time() = current_ts(); set_query_plan(query_msg, query_desc); if (connector->report_query(*query_msg, "started")) { @@ -287,7 +303,7 @@ void EventSender::executor_end(QueryDesc *query_desc) { if (!connector) { return; } - if (!need_collect() || + if (!need_collect(query_desc, nesting_level) || (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE)) { return; } @@ -301,7 +317,13 @@ void EventSender::executor_end(QueryDesc *query_desc) { cdbdisp_checkDispatchResult(query_desc->estate->dispatcherState, DISPATCH_WAIT_NONE); }*/ - query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END); + auto *query = get_query_message(query_desc); + if (query->state == UNKNOWN && !Config::report_nested_queries()) { + // COMMIT/ROLLBACK of a nested query. Happens in top-level + return; + } + update_query_state(query_desc, query, QueryState::END); + auto query_msg = query->message; *query_msg->mutable_end_time() = current_ts(); set_gp_metrics(query_msg->mutable_query_metrics(), query_desc); if (connector->report_query(*query_msg, "ended")) { @@ -310,15 +332,15 @@ void EventSender::executor_end(QueryDesc *query_desc) { } void EventSender::collect_query_submit(QueryDesc *query_desc) { - if (connector && need_collect()) { - if (query_msg && query_msg->has_query_key()) { - connector->report_query(*query_msg, "previous query"); - query_msg->Clear(); - } + if (connector && need_collect(query_desc, nesting_level)) { + auto *query = get_query_message(query_desc); + query->state = QueryState::SUBMIT; + auto query_msg = query->message; *query_msg = create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT); *query_msg->mutable_submit_time() = current_ts(); set_query_info(query_msg, query_desc); + set_qi_nesting_level(query_msg, query_desc->gpmon_pkt->u.qexec.key.tmid); set_query_text(query_msg, query_desc); if (connector->report_query(*query_msg, "submit")) { clear_big_fields(query_msg); @@ -328,11 +350,12 @@ void EventSender::collect_query_submit(QueryDesc *query_desc) { void EventSender::collect_query_done(QueryDesc *query_desc, QueryMetricsStatus status) { - if (connector && need_collect()) { + if (connector && need_collect(query_desc, nesting_level)) { yagpcc::QueryStatus query_status; std::string msg; switch (status) { case METRICS_QUERY_DONE: + case METRICS_INNER_QUERY_DONE: query_status = yagpcc::QueryStatus::QUERY_STATUS_DONE; msg = "done"; break; @@ -352,16 +375,26 @@ void EventSender::collect_query_done(QueryDesc *query_desc, ereport(FATAL, (errmsg("Unexpected query status in query_done hook: %d", status))); } - query_msg->set_query_status(query_status); - if (connector->report_query(*query_msg, msg)) { - query_msg->Clear(); + auto *query = get_query_message(query_desc); + if (query->state != UNKNOWN || Config::report_nested_queries()) { + update_query_state(query_desc, query, QueryState::DONE, + query_status == + yagpcc::QueryStatus::QUERY_STATUS_DONE); + auto query_msg = query->message; + query_msg->set_query_status(query_status); + connector->report_query(*query_msg, msg); + } else { + // otherwise it`s a nested query being committed/aborted at top level + // and we should ignore it } + query_msgs.erase({query_desc->gpmon_pkt->u.qexec.key.ccnt, + query_desc->gpmon_pkt->u.qexec.key.tmid}); + pfree(query_desc->gpmon_pkt); } } EventSender::EventSender() { if (Config::enable_collector() && !Config::filter_user(get_user_name())) { - query_msg = new yagpcc::SetQueryReq(); try { connector = new UDSConnector(); } catch (const std::exception &e) { @@ -371,6 +404,59 @@ EventSender::EventSender() { } EventSender::~EventSender() { - delete query_msg; delete connector; -} \ No newline at end of file + for (auto iter = query_msgs.begin(); iter != query_msgs.end(); ++iter) { + delete iter->second.message; + } +} + +// That's basically a very simplistic state machine to fix or highlight any bugs +// coming from GP +void EventSender::update_query_state(QueryDesc *query_desc, QueryItem *query, + QueryState new_state, bool success) { + if (query->state == UNKNOWN) { + collect_query_submit(query_desc); + } + switch (new_state) { + case QueryState::SUBMIT: + Assert(false); + break; + case QueryState::START: + if (query->state == QueryState::SUBMIT) { + query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START); + } else { + Assert(false); + } + break; + case QueryState::END: + Assert(query->state == QueryState::START || IsAbortInProgress()); + query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END); + break; + case QueryState::DONE: + Assert(query->state == QueryState::END || !success); + query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE); + break; + default: + Assert(false); + } + query->state = new_state; +} + +EventSender::QueryItem *EventSender::get_query_message(QueryDesc *query_desc) { + if (query_desc->gpmon_pkt == nullptr || + query_msgs.find({query_desc->gpmon_pkt->u.qexec.key.ccnt, + query_desc->gpmon_pkt->u.qexec.key.tmid}) == + query_msgs.end()) { + query_desc->gpmon_pkt = (gpmon_packet_t *)palloc0(sizeof(gpmon_packet_t)); + query_desc->gpmon_pkt->u.qexec.key.ccnt = gp_command_count; + query_desc->gpmon_pkt->u.qexec.key.tmid = nesting_level; + query_msgs.insert({{gp_command_count, nesting_level}, + QueryItem(UNKNOWN, new yagpcc::SetQueryReq())}); + } + return &query_msgs.at({query_desc->gpmon_pkt->u.qexec.key.ccnt, + query_desc->gpmon_pkt->u.qexec.key.tmid}); +} + +EventSender::QueryItem::QueryItem(EventSender::QueryState st, + yagpcc::SetQueryReq *msg) + : state(st), message(msg) {} \ No newline at end of file diff --git a/src/EventSender.h b/src/EventSender.h index 0e8985873b6..55b8daf9a91 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -1,7 +1,7 @@ #pragma once #include <memory> -#include <queue> +#include <unordered_map> #include <string> extern "C" { @@ -26,9 +26,31 @@ public: ~EventSender(); private: + enum QueryState { UNKNOWN, SUBMIT, START, END, DONE }; + + struct QueryItem { + QueryState state = QueryState::UNKNOWN; + yagpcc::SetQueryReq *message = nullptr; + + QueryItem(QueryState st, yagpcc::SetQueryReq *msg); + }; + + struct pair_hash { + std::size_t operator()(const std::pair<int, int> &p) const { + auto h1 = std::hash<int>{}(p.first); + auto h2 = std::hash<int>{}(p.second); + return h1 ^ h2; + } + }; + + void update_query_state(QueryDesc *query_desc, QueryItem *query, + QueryState new_state, bool success = true); + QueryItem *get_query_message(QueryDesc *query_desc); void collect_query_submit(QueryDesc *query_desc); void collect_query_done(QueryDesc *query_desc, QueryMetricsStatus status); + void cleanup_messages(); + UDSConnector *connector = nullptr; int nesting_level = 0; - yagpcc::SetQueryReq *query_msg; + std::unordered_map<std::pair<int, int>, QueryItem, pair_hash> query_msgs; }; \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index 37f80385a6b..caf38a10f6e 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -56,9 +56,9 @@ void hooks_init() { void hooks_deinit() { ExecutorStart_hook = previous_ExecutorStart_hook; + ExecutorEnd_hook = previous_ExecutorEnd_hook; ExecutorRun_hook = previous_ExecutorRun_hook; ExecutorFinish_hook = previous_ExecutorFinish_hook; - ExecutorEnd_hook = previous_ExecutorEnd_hook; query_info_collect_hook = previous_query_info_collect_hook; stat_statements_parser_deinit(); if (sender) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
