This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 4dfbd34afdf23991ca7ac2ec96f3f6620771fc67
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Fri May 17 15:55:27 2024 +0300

    [yagp_hooks_collector] Add nested query tracking
    
    Track query nesting level using a per-query key (tmid, ssid, ccnt,
    nesting_level, query_desc_addr).  Maintain a state machine per active
    query to correctly sequence submit→start→end→done across nesting
    boundaries.
---
 protos/yagpcc_metrics.proto     |  10 ++-
 protos/yagpcc_set_service.proto |  32 ++++++++--
 src/Config.cpp                  |   7 ++
 src/Config.h                    |   1 +
 src/EventSender.cpp             | 138 ++++++++++++++++++++++++++++++++--------
 src/EventSender.h               |  26 +++++++-
 src/hook_wrappers.cpp           |   2 +-
 7 files changed, 178 insertions(+), 38 deletions(-)

diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto
index 2d20d3c46d9..68492732ece 100644
--- a/protos/yagpcc_metrics.proto
+++ b/protos/yagpcc_metrics.proto
@@ -36,6 +36,11 @@ message QueryInfo {
     string rsgname = 10;
 }
 
+message AdditionalQueryInfo {
+    int64 nested_level                    = 1;
+    string error_message                  = 2;
+}
+
 enum PlanGenerator
 {
     PLAN_GENERATOR_UNSPECIFIED = 0;
@@ -95,7 +100,7 @@ message MetricInstrumentation {
     uint64  nloops      = 2;    /* # of run cycles for this node */
     uint64  tuplecount  = 3;    /* Tuples emitted so far this cycle */
     double  firsttuple  = 4;    /* Time for first tuple of this cycle */
-    double  startup     = 5;    /* Total startup time (in seconds) */
+    double  startup     = 5;    /* Total startup time (in seconds) 
(optimiser's cost estimation) */
     double  total       = 6;    /* Total total time (in seconds) */
     uint64 shared_blks_hit     = 7; /* shared blocks stats*/
     uint64 shared_blks_read    = 8;
@@ -105,12 +110,13 @@ message MetricInstrumentation {
     uint64 local_blks_read     = 12;
     uint64 local_blks_dirtied  = 13;
     uint64 local_blks_written  = 14;
-    uint64 temp_blks_read      = 15; /* temporary tables read stat */ 
+    uint64 temp_blks_read      = 15; /* temporary tables read stat */
     uint64 temp_blks_written   = 16;
     double blk_read_time       = 17; /* measured read/write time */
     double blk_write_time      = 18;
     NetworkStat sent           = 19;
     NetworkStat received       = 20;
+    double  startup_time       = 21; /* real query startup time (planning + 
queue time) */
 }
 
 message SpillInfo {
diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto
index e8fc7aaa99d..0b9e34df49d 100644
--- a/protos/yagpcc_set_service.proto
+++ b/protos/yagpcc_set_service.proto
@@ -9,17 +9,35 @@ package yagpcc;
 option java_outer_classname = "SegmentYAGPCCAS";
 option go_package = 
"a.yandex-team.ru/cloud/mdb/yagpcc/api/proto/agent_segment;greenplum";
 
+service SetQueryInfo {
+    rpc SetMetricPlanNode (SetPlanNodeReq) returns (MetricResponse) {}
+
+    rpc SetMetricQuery (SetQueryReq) returns (MetricResponse) {}
+}
+
+message MetricResponse {
+    MetricResponseStatusCode error_code = 1;
+    string error_text = 2;
+}
+
+enum MetricResponseStatusCode {
+    METRIC_RESPONSE_STATUS_CODE_UNSPECIFIED = 0;
+    METRIC_RESPONSE_STATUS_CODE_SUCCESS = 1;
+    METRIC_RESPONSE_STATUS_CODE_ERROR = 2;
+}
+
 message SetQueryReq {
-    QueryStatus           query_status = 1;
-    google.protobuf.Timestamp datetime = 2;
-    QueryKey                 query_key = 3;
-    SegmentKey             segment_key = 4;
-    QueryInfo               query_info = 5;
-    GPMetrics            query_metrics = 6;
-    repeated MetricPlan      plan_tree = 7;
+    QueryStatus           query_status    = 1;
+    google.protobuf.Timestamp datetime    = 2;
+    QueryKey                 query_key    = 3;
+    SegmentKey             segment_key    = 4;
+    QueryInfo               query_info    = 5;
+    GPMetrics            query_metrics    = 6;
+    repeated MetricPlan      plan_tree    = 7;
     google.protobuf.Timestamp submit_time = 8;
     google.protobuf.Timestamp start_time  = 9;
     google.protobuf.Timestamp end_time    = 10;
+    AdditionalQueryInfo add_info          = 11;
 }
 
 message SetPlanNodeReq {
diff --git a/src/Config.cpp b/src/Config.cpp
index c5c2c15f7e9..1bbad9a6ea3 100644
--- a/src/Config.cpp
+++ b/src/Config.cpp
@@ -13,6 +13,7 @@ static char *guc_uds_path = nullptr;
 static bool guc_enable_analyze = true;
 static bool guc_enable_cdbstats = true;
 static bool guc_enable_collector = true;
+static bool guc_report_nested_queries = true;
 static char *guc_ignored_users = nullptr;
 static std::unique_ptr<std::unordered_set<std::string>> ignored_users = 
nullptr;
 
@@ -36,6 +37,11 @@ void Config::init() {
       &guc_enable_cdbstats, true, PGC_SUSET,
       GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL);
 
+  DefineCustomBoolVariable(
+      "yagpcc.report_nested_queries", "Collect stats on nested queries", 0LL,
+      &guc_report_nested_queries, true, PGC_SUSET,
+      GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, 0LL, 0LL, 0LL);
+
   DefineCustomStringVariable(
       "yagpcc.ignored_users_list",
       "Make yagpcc ignore queries issued by given users", 0LL,
@@ -47,6 +53,7 @@ std::string Config::uds_path() { return guc_uds_path; }
 bool Config::enable_analyze() { return guc_enable_analyze; }
 bool Config::enable_cdbstats() { return guc_enable_cdbstats; }
 bool Config::enable_collector() { return guc_enable_collector; }
+bool Config::report_nested_queries() { return guc_report_nested_queries; }
 
 bool Config::filter_user(const std::string *username) {
   if (!ignored_users) {
diff --git a/src/Config.h b/src/Config.h
index 999d0300640..15f425be67c 100644
--- a/src/Config.h
+++ b/src/Config.h
@@ -10,4 +10,5 @@ public:
   static bool enable_cdbstats();
   static bool enable_collector();
   static bool filter_user(const std::string *username);
+  static bool report_nested_queries();
 };
\ No newline at end of file
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index 21c2e2117a3..116805d0646 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -10,6 +10,7 @@ extern "C" {
 #include "postgres.h"
 
 #include "access/hash.h"
+#include "access/xact.h"
 #include "commands/dbcommands.h"
 #include "commands/explain.h"
 #include "commands/resgroupcmds.h"
@@ -30,11 +31,6 @@ extern "C" {
 
 #include "EventSender.h"
 
-#define need_collect()                                                         
\
-  (nesting_level == 0 && gp_command_count != 0 &&                              
\
-   query_desc->sourceText != nullptr && Config::enable_collector() &&          
\
-   !Config::filter_user(get_user_name()))
-
 namespace {
 
 std::string *get_user_name() {
@@ -146,6 +142,11 @@ void set_query_info(yagpcc::SetQueryReq *req, QueryDesc 
*query_desc) {
   }
 }
 
+void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level) {
+  auto aqi = req->mutable_add_info();
+  aqi->set_nested_level(nesting_level);
+}
+
 void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
                                 QueryDesc *query_desc) {
   auto instrument = query_desc->planstate->instrument;
@@ -210,6 +211,19 @@ yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc,
   return req;
 }
 
+inline bool is_top_level_query(QueryDesc *query_desc, int nesting_level) {
+  return (query_desc->gpmon_pkt &&
+          query_desc->gpmon_pkt->u.qexec.key.tmid == 0) ||
+         nesting_level == 0;
+}
+
+inline bool need_collect(QueryDesc *query_desc, int nesting_level) {
+  return (Config::report_nested_queries() ||
+          is_top_level_query(query_desc, nesting_level)) &&
+         gp_command_count != 0 && query_desc->sourceText != nullptr &&
+         Config::enable_collector() && !Config::filter_user(get_user_name());
+}
+
 } // namespace
 
 void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) {
@@ -223,7 +237,8 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
     // TODO
     break;
   case METRICS_QUERY_SUBMIT:
-    collect_query_submit(reinterpret_cast<QueryDesc *>(arg));
+    // don't collect anything here. We will fake this call in ExecutorStart as
+    // it really makes no difference. Just complicates things
     break;
   case METRICS_QUERY_START:
     // no-op: executor_after_start is enough
@@ -232,10 +247,8 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
   case METRICS_QUERY_ERROR:
   case METRICS_QUERY_CANCELING:
   case METRICS_QUERY_CANCELED:
-    collect_query_done(reinterpret_cast<QueryDesc *>(arg), status);
-    break;
   case METRICS_INNER_QUERY_DONE:
-    // TODO
+    collect_query_done(reinterpret_cast<QueryDesc *>(arg), status);
     break;
   default:
     ereport(FATAL, (errmsg("Unknown query status: %d", status)));
@@ -247,9 +260,10 @@ void EventSender::executor_before_start(QueryDesc 
*query_desc,
   if (!connector) {
     return;
   }
-  if (!need_collect()) {
+  if (!need_collect(query_desc, nesting_level)) {
     return;
   }
+  collect_query_submit(query_desc);
   query_start_time = std::chrono::high_resolution_clock::now();
   WorkfileResetBackendStats();
   if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze()) {
@@ -273,8 +287,10 @@ void EventSender::executor_after_start(QueryDesc 
*query_desc, int /* eflags*/) {
     return;
   }
   if ((Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) &&
-      need_collect()) {
-    query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
+      need_collect(query_desc, nesting_level)) {
+    auto *query = get_query_message(query_desc);
+    update_query_state(query_desc, query, QueryState::START);
+    auto query_msg = query->message;
     *query_msg->mutable_start_time() = current_ts();
     set_query_plan(query_msg, query_desc);
     if (connector->report_query(*query_msg, "started")) {
@@ -287,7 +303,7 @@ void EventSender::executor_end(QueryDesc *query_desc) {
   if (!connector) {
     return;
   }
-  if (!need_collect() ||
+  if (!need_collect(query_desc, nesting_level) ||
       (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE)) {
     return;
   }
@@ -301,7 +317,13 @@ void EventSender::executor_end(QueryDesc *query_desc) {
     cdbdisp_checkDispatchResult(query_desc->estate->dispatcherState,
                                 DISPATCH_WAIT_NONE);
   }*/
-  query_msg->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END);
+  auto *query = get_query_message(query_desc);
+  if (query->state == UNKNOWN && !Config::report_nested_queries()) {
+    // COMMIT/ROLLBACK of a nested query. Happens in top-level
+    return;
+  }
+  update_query_state(query_desc, query, QueryState::END);
+  auto query_msg = query->message;
   *query_msg->mutable_end_time() = current_ts();
   set_gp_metrics(query_msg->mutable_query_metrics(), query_desc);
   if (connector->report_query(*query_msg, "ended")) {
@@ -310,15 +332,15 @@ void EventSender::executor_end(QueryDesc *query_desc) {
 }
 
 void EventSender::collect_query_submit(QueryDesc *query_desc) {
-  if (connector && need_collect()) {
-    if (query_msg && query_msg->has_query_key()) {
-      connector->report_query(*query_msg, "previous query");
-      query_msg->Clear();
-    }
+  if (connector && need_collect(query_desc, nesting_level)) {
+    auto *query = get_query_message(query_desc);
+    query->state = QueryState::SUBMIT;
+    auto query_msg = query->message;
     *query_msg =
         create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
     *query_msg->mutable_submit_time() = current_ts();
     set_query_info(query_msg, query_desc);
+    set_qi_nesting_level(query_msg, query_desc->gpmon_pkt->u.qexec.key.tmid);
     set_query_text(query_msg, query_desc);
     if (connector->report_query(*query_msg, "submit")) {
       clear_big_fields(query_msg);
@@ -328,11 +350,12 @@ void EventSender::collect_query_submit(QueryDesc 
*query_desc) {
 
 void EventSender::collect_query_done(QueryDesc *query_desc,
                                      QueryMetricsStatus status) {
-  if (connector && need_collect()) {
+  if (connector && need_collect(query_desc, nesting_level)) {
     yagpcc::QueryStatus query_status;
     std::string msg;
     switch (status) {
     case METRICS_QUERY_DONE:
+    case METRICS_INNER_QUERY_DONE:
       query_status = yagpcc::QueryStatus::QUERY_STATUS_DONE;
       msg = "done";
       break;
@@ -352,16 +375,26 @@ void EventSender::collect_query_done(QueryDesc 
*query_desc,
       ereport(FATAL, (errmsg("Unexpected query status in query_done hook: %d",
                              status)));
     }
-    query_msg->set_query_status(query_status);
-    if (connector->report_query(*query_msg, msg)) {
-      query_msg->Clear();
+    auto *query = get_query_message(query_desc);
+    if (query->state != UNKNOWN || Config::report_nested_queries()) {
+      update_query_state(query_desc, query, QueryState::DONE,
+                         query_status ==
+                             yagpcc::QueryStatus::QUERY_STATUS_DONE);
+      auto query_msg = query->message;
+      query_msg->set_query_status(query_status);
+      connector->report_query(*query_msg, msg);
+    } else {
+      // otherwise it`s a nested query being committed/aborted at top level
+      // and we should ignore it
     }
+    query_msgs.erase({query_desc->gpmon_pkt->u.qexec.key.ccnt,
+                      query_desc->gpmon_pkt->u.qexec.key.tmid});
+    pfree(query_desc->gpmon_pkt);
   }
 }
 
 EventSender::EventSender() {
   if (Config::enable_collector() && !Config::filter_user(get_user_name())) {
-    query_msg = new yagpcc::SetQueryReq();
     try {
       connector = new UDSConnector();
     } catch (const std::exception &e) {
@@ -371,6 +404,59 @@ EventSender::EventSender() {
 }
 
 EventSender::~EventSender() {
-  delete query_msg;
   delete connector;
-}
\ No newline at end of file
+  for (auto iter = query_msgs.begin(); iter != query_msgs.end(); ++iter) {
+    delete iter->second.message;
+  }
+}
+
+// That's basically a very simplistic state machine to fix or highlight any 
bugs
+// coming from GP
+void EventSender::update_query_state(QueryDesc *query_desc, QueryItem *query,
+                                     QueryState new_state, bool success) {
+  if (query->state == UNKNOWN) {
+    collect_query_submit(query_desc);
+  }
+  switch (new_state) {
+  case QueryState::SUBMIT:
+    Assert(false);
+    break;
+  case QueryState::START:
+    if (query->state == QueryState::SUBMIT) {
+      
query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
+    } else {
+      Assert(false);
+    }
+    break;
+  case QueryState::END:
+    Assert(query->state == QueryState::START || IsAbortInProgress());
+    query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END);
+    break;
+  case QueryState::DONE:
+    Assert(query->state == QueryState::END || !success);
+    query->message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
+    break;
+  default:
+    Assert(false);
+  }
+  query->state = new_state;
+}
+
+EventSender::QueryItem *EventSender::get_query_message(QueryDesc *query_desc) {
+  if (query_desc->gpmon_pkt == nullptr ||
+      query_msgs.find({query_desc->gpmon_pkt->u.qexec.key.ccnt,
+                       query_desc->gpmon_pkt->u.qexec.key.tmid}) ==
+          query_msgs.end()) {
+    query_desc->gpmon_pkt = (gpmon_packet_t *)palloc0(sizeof(gpmon_packet_t));
+    query_desc->gpmon_pkt->u.qexec.key.ccnt = gp_command_count;
+    query_desc->gpmon_pkt->u.qexec.key.tmid = nesting_level;
+    query_msgs.insert({{gp_command_count, nesting_level},
+                       QueryItem(UNKNOWN, new yagpcc::SetQueryReq())});
+  }
+  return &query_msgs.at({query_desc->gpmon_pkt->u.qexec.key.ccnt,
+                         query_desc->gpmon_pkt->u.qexec.key.tmid});
+}
+
+EventSender::QueryItem::QueryItem(EventSender::QueryState st,
+                                  yagpcc::SetQueryReq *msg)
+    : state(st), message(msg) {}
\ No newline at end of file
diff --git a/src/EventSender.h b/src/EventSender.h
index 0e8985873b6..55b8daf9a91 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -1,7 +1,7 @@
 #pragma once
 
 #include <memory>
-#include <queue>
+#include <unordered_map>
 #include <string>
 
 extern "C" {
@@ -26,9 +26,31 @@ public:
   ~EventSender();
 
 private:
+  enum QueryState { UNKNOWN, SUBMIT, START, END, DONE };
+
+  struct QueryItem {
+    QueryState state = QueryState::UNKNOWN;
+    yagpcc::SetQueryReq *message = nullptr;
+
+    QueryItem(QueryState st, yagpcc::SetQueryReq *msg);
+  };
+
+  struct pair_hash {
+    std::size_t operator()(const std::pair<int, int> &p) const {
+      auto h1 = std::hash<int>{}(p.first);
+      auto h2 = std::hash<int>{}(p.second);
+      return h1 ^ h2;
+    }
+  };
+
+  void update_query_state(QueryDesc *query_desc, QueryItem *query,
+                          QueryState new_state, bool success = true);
+  QueryItem *get_query_message(QueryDesc *query_desc);
   void collect_query_submit(QueryDesc *query_desc);
   void collect_query_done(QueryDesc *query_desc, QueryMetricsStatus status);
+  void cleanup_messages();
+
   UDSConnector *connector = nullptr;
   int nesting_level = 0;
-  yagpcc::SetQueryReq *query_msg;
+  std::unordered_map<std::pair<int, int>, QueryItem, pair_hash> query_msgs;
 };
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index 37f80385a6b..caf38a10f6e 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -56,9 +56,9 @@ void hooks_init() {
 
 void hooks_deinit() {
   ExecutorStart_hook = previous_ExecutorStart_hook;
+  ExecutorEnd_hook = previous_ExecutorEnd_hook;
   ExecutorRun_hook = previous_ExecutorRun_hook;
   ExecutorFinish_hook = previous_ExecutorFinish_hook;
-  ExecutorEnd_hook = previous_ExecutorEnd_hook;
   query_info_collect_hook = previous_query_info_collect_hook;
   stat_statements_parser_deinit();
   if (sender) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to