This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 729b35f86fd5df36beba2b7f7ee7a7dd2d1ef4d2
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Mon Apr 10 16:01:08 2023 +0300

    [yagp_hooks_collector] Switch to query_info_collect_hook and fix stability
    
    Use query_info_collect_hook for finer-grained lifecycle tracking.
    Fix two segfaults in early init paths.  Skip hooks in UTILITY mode.
    General robustness improvements.
---
 protos/yagpcc_set_service.proto                    |   7 +-
 src/EventSender.cpp                                | 207 ++++++++++++++-------
 src/EventSender.h                                  |  13 +-
 src/GrpcConnector.cpp                              |   4 +-
 src/GrpcConnector.h                                |   2 +-
 src/hook_wrappers.cpp                              |  65 ++++---
 .../pg_stat_statements_ya_parser.c                 |  21 +++
 7 files changed, 206 insertions(+), 113 deletions(-)

diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto
index 97c5691a6f5..93c2f5a01d1 100644
--- a/protos/yagpcc_set_service.proto
+++ b/protos/yagpcc_set_service.proto
@@ -30,9 +30,10 @@ message SetQueryReq {
     QueryStatus           query_status = 1;
     google.protobuf.Timestamp datetime = 2;
     QueryKey                 query_key = 3;
-    QueryInfo               query_info = 4;
-    GPMetrics            query_metrics = 5;
-    repeated MetricPlan      plan_tree = 6;
+    SegmentKey             segment_key = 4;
+    QueryInfo               query_info = 5;
+    GPMetrics            query_metrics = 6;
+    repeated MetricPlan      plan_tree = 7;
 }
 
 message SetPlanNodeReq {
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index b7c3cd70b85..5ab6bbd60df 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -1,29 +1,30 @@
-#include "EventSender.h"
 #include "GrpcConnector.h"
 #include "ProcStats.h"
-#include "protos/yagpcc_set_service.pb.h"
 #include <ctime>
 
 extern "C" {
 #include "postgres.h"
+
 #include "access/hash.h"
-#include "utils/metrics_utils.h"
-#include "utils/elog.h"
-#include "executor/executor.h"
-#include "commands/explain.h"
 #include "commands/dbcommands.h"
+#include "commands/explain.h"
 #include "commands/resgroupcmds.h"
+#include "executor/executor.h"
+#include "utils/elog.h"
+#include "utils/metrics_utils.h"
 
-#include "cdb/cdbvars.h"
 #include "cdb/cdbexplain.h"
+#include "cdb/cdbvars.h"
 
+#include "stat_statements_parser/pg_stat_statements_ya_parser.h"
 #include "tcop/utility.h"
-#include "pg_stat_statements_ya_parser.h"
 
 void get_spill_info(int ssid, int ccid, int32_t *file_count,
                     int64_t *total_bytes);
 }
 
+#include "EventSender.h"
+
 namespace {
 
 std::string *get_user_name() {
@@ -102,90 +103,152 @@ void set_query_text(yagpcc::QueryInfo *qi, QueryDesc 
*query_desc) {
   pfree(norm_query);
 }
 
-void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc) {
-  if (query_desc->sourceText) {
-    set_query_text(qi, query_desc);
+void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc,
+                    bool with_text, bool with_plan) {
+  if (Gp_session_role == GP_ROLE_DISPATCH) {
+    if (query_desc->sourceText && with_text) {
+      set_query_text(qi, query_desc);
+    }
+    if (query_desc->plannedstmt && with_plan) {
+      set_query_plan(qi, query_desc);
+      qi->set_query_id(query_desc->plannedstmt->queryId);
+    }
+    qi->set_allocated_username(get_user_name());
+    qi->set_allocated_databasename(get_db_name());
   }
-  if (query_desc->plannedstmt) {
-    set_query_plan(qi, query_desc);
-    qi->set_query_id(query_desc->plannedstmt->queryId);
-  }
-  qi->set_allocated_username(get_user_name());
-  qi->set_allocated_databasename(get_db_name());
 }
 
 void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics,
                                 QueryDesc *query_desc) {
   auto instrument = query_desc->planstate->instrument;
-  metrics->set_ntuples(instrument->ntuples);
-  metrics->set_nloops(instrument->nloops);
-  metrics->set_tuplecount(instrument->tuplecount);
-  metrics->set_firsttuple(instrument->firsttuple);
-  metrics->set_startup(instrument->startup);
-  metrics->set_total(instrument->total);
-  auto &buffusage = instrument->bufusage;
-  metrics->set_shared_blks_hit(buffusage.shared_blks_hit);
-  metrics->set_shared_blks_read(buffusage.shared_blks_read);
-  metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied);
-  metrics->set_shared_blks_written(buffusage.shared_blks_written);
-  metrics->set_local_blks_hit(buffusage.local_blks_hit);
-  metrics->set_local_blks_read(buffusage.local_blks_read);
-  metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied);
-  metrics->set_local_blks_written(buffusage.local_blks_written);
-  metrics->set_temp_blks_read(buffusage.temp_blks_read);
-  metrics->set_temp_blks_written(buffusage.temp_blks_written);
-  metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time));
-  metrics->set_blk_write_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time));
-}
-
-void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc) {
-  int32_t n_spill_files = 0;
-  int64_t n_spill_bytes = 0;
-  get_spill_info(gp_session_id, gp_command_count, &n_spill_files,
-                 &n_spill_bytes);
-  metrics->mutable_spill()->set_filecount(n_spill_files);
-  metrics->mutable_spill()->set_totalbytes(n_spill_bytes);
-  if (query_desc->planstate->instrument) {
+  if (instrument) {
+    metrics->set_ntuples(instrument->ntuples);
+    metrics->set_nloops(instrument->nloops);
+    metrics->set_tuplecount(instrument->tuplecount);
+    metrics->set_firsttuple(instrument->firsttuple);
+    metrics->set_startup(instrument->startup);
+    metrics->set_total(instrument->total);
+    auto &buffusage = instrument->bufusage;
+    metrics->set_shared_blks_hit(buffusage.shared_blks_hit);
+    metrics->set_shared_blks_read(buffusage.shared_blks_read);
+    metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied);
+    metrics->set_shared_blks_written(buffusage.shared_blks_written);
+    metrics->set_local_blks_hit(buffusage.local_blks_hit);
+    metrics->set_local_blks_read(buffusage.local_blks_read);
+    metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied);
+    metrics->set_local_blks_written(buffusage.local_blks_written);
+    metrics->set_temp_blks_read(buffusage.temp_blks_read);
+    metrics->set_temp_blks_written(buffusage.temp_blks_written);
+    metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time));
+    metrics->set_blk_write_time(
+        INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time));
+  }
+}
+
+void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc,
+                    bool need_spillinfo) {
+  if (need_spillinfo) {
+    int32_t n_spill_files = 0;
+    int64_t n_spill_bytes = 0;
+    get_spill_info(gp_session_id, gp_command_count, &n_spill_files,
+                   &n_spill_bytes);
+    metrics->mutable_spill()->set_filecount(n_spill_files);
+    metrics->mutable_spill()->set_totalbytes(n_spill_bytes);
+  }
+  if (query_desc->planstate && query_desc->planstate->instrument) {
     set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc);
   }
   fill_self_stats(metrics->mutable_systemstat());
 }
 
+yagpcc::SetQueryReq create_query_req(QueryDesc *query_desc,
+                                     yagpcc::QueryStatus status) {
+  yagpcc::SetQueryReq req;
+  req.set_query_status(status);
+  *req.mutable_datetime() = current_ts();
+  set_query_key(req.mutable_query_key(), query_desc);
+  set_segment_key(req.mutable_segment_key(), query_desc);
+  return req;
+}
+
 } // namespace
 
-void EventSender::ExecutorStart(QueryDesc *query_desc, int /* eflags*/) {
+void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) {
+  if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
+    return;
+  }
+  switch (status) {
+  case METRICS_PLAN_NODE_INITIALIZE:
+  case METRICS_PLAN_NODE_EXECUTING:
+  case METRICS_PLAN_NODE_FINISHED:
+    // TODO
+    break;
+  case METRICS_QUERY_SUBMIT:
+    collect_query_submit(reinterpret_cast<QueryDesc *>(arg));
+    break;
+  case METRICS_QUERY_START:
+    // no-op: executor_after_start is enough
+    break;
+  case METRICS_QUERY_DONE:
+    collect_query_done(reinterpret_cast<QueryDesc *>(arg), "done");
+    break;
+  case METRICS_QUERY_ERROR:
+    collect_query_done(reinterpret_cast<QueryDesc *>(arg), "error");
+    break;
+  case METRICS_QUERY_CANCELING:
+    collect_query_done(reinterpret_cast<QueryDesc *>(arg), "calcelling");
+    break;
+  case METRICS_QUERY_CANCELED:
+    collect_query_done(reinterpret_cast<QueryDesc *>(arg), "cancelled");
+    break;
+  case METRICS_INNER_QUERY_DONE:
+    // TODO
+    break;
+  default:
+    elog(FATAL, "Unknown query status: %d", status);
+  }
+}
+
+void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) 
{
+  if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) {
+    return;
+  }
+  auto req =
+      create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_START);
+  set_query_info(req.mutable_query_info(), query_desc, false, true);
+  send_query_info(&req, "started");
+}
+
+void EventSender::collect_query_submit(QueryDesc *query_desc) {
   query_desc->instrument_options |= INSTRUMENT_BUFFERS;
   query_desc->instrument_options |= INSTRUMENT_ROWS;
   query_desc->instrument_options |= INSTRUMENT_TIMER;
 
-  elog(DEBUG1, "Query %s start recording", query_desc->sourceText);
-  yagpcc::SetQueryReq req;
-  req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
-  *req.mutable_datetime() = current_ts();
-  set_query_key(req.mutable_query_key(), query_desc);
-  auto result = connector->set_metric_query(req);
-  if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) {
-    elog(WARNING, "Query %s start reporting failed with an error %s",
-         query_desc->sourceText, result.error_text().c_str());
-  } else {
-    elog(DEBUG1, "Query %s start successful", query_desc->sourceText);
-  }
+  auto req =
+      create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_SUBMIT);
+  set_query_info(req.mutable_query_info(), query_desc, true, false);
+  send_query_info(&req, "submit");
 }
 
-void EventSender::ExecutorFinish(QueryDesc *query_desc) {
-  elog(DEBUG1, "Query %s finish recording", query_desc->sourceText);
-  yagpcc::SetQueryReq req;
-  req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
-  *req.mutable_datetime() = current_ts();
-  set_query_key(req.mutable_query_key(), query_desc);
-  set_query_info(req.mutable_query_info(), query_desc);
-  set_gp_metrics(req.mutable_query_metrics(), query_desc);
-  auto result = connector->set_metric_query(req);
+void EventSender::collect_query_done(QueryDesc *query_desc,
+                                     const std::string &status) {
+  auto req =
+      create_query_req(query_desc, yagpcc::QueryStatus::QUERY_STATUS_DONE);
+  set_query_info(req.mutable_query_info(), query_desc, false, false);
+  // NOTE: there are no cummulative spillinfo stats AFAIU, so no need to gather
+  // it here. It only makes sense when doing regular stat checks.
+  set_gp_metrics(req.mutable_query_metrics(), query_desc,
+                 /*need_spillinfo*/ false);
+  send_query_info(&req, status);
+}
+
+void EventSender::send_query_info(yagpcc::SetQueryReq *req,
+                                  const std::string &event) {
+  auto result = connector->set_metric_query(*req);
   if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) {
-    elog(WARNING, "Query %s finish reporting failed with an error %s",
-         query_desc->sourceText, result.error_text().c_str());
-  } else {
-    elog(DEBUG1, "Query %s finish successful", query_desc->sourceText);
+    elog(WARNING, "Query {%d-%d-%d} %s reporting failed with an error %s",
+         req->query_key().tmid(), req->query_key().ssid(),
+         req->query_key().ccnt(), event.c_str(), result.error_text().c_str());
   }
 }
 
diff --git a/src/EventSender.h b/src/EventSender.h
index d69958db9b0..9c574cba9a1 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -1,18 +1,25 @@
 #pragma once
 
 #include <memory>
+#include <string>
 
 class GrpcConnector;
-
 struct QueryDesc;
+namespace yagpcc {
+class SetQueryReq;
+}
 
 class EventSender {
 public:
-  void ExecutorStart(QueryDesc *query_desc, int eflags);
-  void ExecutorFinish(QueryDesc *query_desc);
+  void executor_after_start(QueryDesc *query_desc, int eflags);
+  void query_metrics_collect(QueryMetricsStatus status, void *arg);
   static EventSender *instance();
 
 private:
+  void collect_query_submit(QueryDesc *query_desc);
+  void collect_query_done(QueryDesc *query_desc, const std::string &status);
+
   EventSender();
+  void send_query_info(yagpcc::SetQueryReq *req, const std::string &event);
   std::unique_ptr<GrpcConnector> connector;
 };
\ No newline at end of file
diff --git a/src/GrpcConnector.cpp b/src/GrpcConnector.cpp
index 1a820404428..bca1acd9ce2 100644
--- a/src/GrpcConnector.cpp
+++ b/src/GrpcConnector.cpp
@@ -16,8 +16,10 @@ public:
   yagpcc::MetricResponse set_metric_query(yagpcc::SetQueryReq req) {
     yagpcc::MetricResponse response;
     grpc::ClientContext context;
+    // TODO: find a more secure way to send messages than relying on a fixed
+    // timeout
     auto deadline =
-        std::chrono::system_clock::now() + std::chrono::milliseconds(50);
+        std::chrono::system_clock::now() + std::chrono::milliseconds(200);
     context.set_deadline(deadline);
 
     grpc::Status status = (stub->SetMetricQuery)(&context, req, &response);
diff --git a/src/GrpcConnector.h b/src/GrpcConnector.h
index 810c0bd3e15..4fca6960a4e 100644
--- a/src/GrpcConnector.h
+++ b/src/GrpcConnector.h
@@ -1,6 +1,6 @@
 #pragma once
 
-#include "yagpcc_set_service.pb.h"
+#include "protos/yagpcc_set_service.pb.h"
 
 class GrpcConnector {
 public:
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index 739cca80f01..be39c953970 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -1,6 +1,3 @@
-#include "hook_wrappers.h"
-#include "EventSender.h"
-
 extern "C" {
 #include "postgres.h"
 #include "utils/metrics_utils.h"
@@ -14,55 +11,57 @@ extern "C" {
 }
 
 #include "stat_statements_parser/pg_stat_statements_ya_parser.h"
+#include "hook_wrappers.h"
+#include "EventSender.h"
 
 static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr;
-static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr;
-
-static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags);
-static void ya_ExecutorFinish_hook(QueryDesc *query_desc);
+static query_info_collect_hook_type previous_query_info_collect_hook = nullptr;
 
-#define REPLACE_HOOK(hookName)                                                 
\
-  previous_##hookName = hookName;                                              
\
-  hookName = ya_##hookName;
+static void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags);
+static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg);
 
 void hooks_init() {
-  REPLACE_HOOK(ExecutorStart_hook);
-  REPLACE_HOOK(ExecutorFinish_hook);
+  previous_ExecutorStart_hook = ExecutorStart_hook;
+  ExecutorStart_hook = ya_ExecutorAfterStart_hook;
+  previous_query_info_collect_hook = query_info_collect_hook;
+  query_info_collect_hook = ya_query_info_collect_hook;
   stat_statements_parser_init();
 }
 
 void hooks_deinit() {
   ExecutorStart_hook = previous_ExecutorStart_hook;
-  ExecutorFinish_hook = previous_ExecutorFinish_hook;
+  query_info_collect_hook = previous_query_info_collect_hook;
   stat_statements_parser_deinit();
 }
 
-#define CREATE_HOOK_WRAPPER(hookName, ...)                                     
\
-  PG_TRY();                                                                    
\
-  { EventSender::instance()->hookName(__VA_ARGS__); }                          
\
-  PG_CATCH();                                                                  
\
-  {                                                                            
\
-    ereport(WARNING, (errmsg("EventSender failed in %s", #hookName)));         
\
-    PG_RE_THROW();                                                             
\
-  }                                                                            
\
-  PG_END_TRY();                                                                
\
-  if (previous_##hookName##_hook)                                              
\
-    (*previous_##hookName##_hook)(__VA_ARGS__);                                
\
-  else                                                                         
\
-    standard_##hookName(__VA_ARGS__);
-
-void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) {
-  CREATE_HOOK_WRAPPER(ExecutorStart, query_desc, eflags);
+void ya_ExecutorAfterStart_hook(QueryDesc *query_desc, int eflags) {
+  if (previous_ExecutorStart_hook) {
+    (*previous_ExecutorStart_hook)(query_desc, eflags);
+  } else {
+    standard_ExecutorStart(query_desc, eflags);
+  }
   PG_TRY();
-  { EventSender::instance()->ExecutorStart(query_desc, eflags); }
+  { EventSender::instance()->executor_after_start(query_desc, eflags); }
   PG_CATCH();
   {
-    ereport(WARNING, (errmsg("EventSender failed in ExecutorStart 
afterhook")));
+    ereport(WARNING,
+            (errmsg("EventSender failed in ya_ExecutorAfterStart_hook")));
     PG_RE_THROW();
   }
   PG_END_TRY();
 }
 
-void ya_ExecutorFinish_hook(QueryDesc *query_desc) {
-  CREATE_HOOK_WRAPPER(ExecutorFinish, query_desc);
+void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) {
+  PG_TRY();
+  { EventSender::instance()->query_metrics_collect(status, arg); }
+  PG_CATCH();
+  {
+    ereport(WARNING,
+            (errmsg("EventSender failed in ya_query_info_collect_hook")));
+    PG_RE_THROW();
+  }
+  PG_END_TRY();
+  if (previous_query_info_collect_hook) {
+    (*previous_query_info_collect_hook)(status, arg);
+  }
 }
\ No newline at end of file
diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c 
b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
index ae79e7dc40a..737e77745df 100644
--- a/src/stat_statements_parser/pg_stat_statements_ya_parser.c
+++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
@@ -205,6 +205,13 @@ JumbleRangeTable(pgssJumbleState *jstate, List *rtable)
                        APP_JUMB_STRING(rte->ctename);
                        APP_JUMB(rte->ctelevelsup);
                        break;
+               /* GPDB RTEs */
+               case RTE_VOID:
+                       break;
+               case RTE_TABLEFUNCTION:
+                       JumbleQuery(jstate, rte->subquery);
+                       JumbleExpr(jstate, (Node *)rte->functions);
+                       break;
                default:
                        elog(ERROR, "unrecognized RTE kind: %d", 
(int)rte->rtekind);
                        break;
@@ -609,6 +616,20 @@ JumbleExpr(pgssJumbleState *jstate, Node *node)
                JumbleExpr(jstate, rtfunc->funcexpr);
        }
        break;
+       /* GPDB nodes */
+       case T_GroupingFunc:
+       {
+               GroupingFunc *grpnode = (GroupingFunc *)node;
+
+               JumbleExpr(jstate, (Node *)grpnode->args);
+       }
+       break;
+       case T_Grouping:
+       case T_GroupId:
+       case T_Integer:
+       case T_Value:
+               // TODO: no idea what to do with those
+               break;
        default:
                /* Only a warning, since we can stumble along anyway */
                elog(WARNING, "unrecognized node type: %d",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to