This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit b3bf74265f47931ac910dbd053239f7cc1d03588
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Wed Mar 29 16:10:20 2023 +0300

    [yagp_hooks_collector] Add executor instrumentation, /proc stats, and 
normalized texts
    
    Collect spill info (file count, bytes written).  Generate normalized
    query and plan texts using a pg_stat_statements-derived parser.
    Collect buffer I/O counters, tuple counts, timing, and /proc/self
    CPU/memory/IO statistics.
---
 Makefile                                           |   1 -
 protos/yagpcc_metrics.proto                        |  42 +---
 protos/yagpcc_set_service.proto                    |  20 +-
 src/EventSender.cpp                                | 164 ++++++++------
 src/EventSender.h                                  |   4 +-
 src/ProcStats.cpp                                  | 119 ++++++++++
 src/ProcStats.h                                    |   7 +
 src/SpillInfoWrapper.c                             |  21 ++
 src/hook_wrappers.cpp                              |  22 +-
 .../pg_stat_statements_ya_parser.c                 | 248 +++++++++++++++++++--
 .../pg_stat_statements_ya_parser.h                 |   3 +-
 11 files changed, 519 insertions(+), 132 deletions(-)

diff --git a/Makefile b/Makefile
index 15c5dabb70e..0a21cf136ff 100644
--- a/Makefile
+++ b/Makefile
@@ -11,7 +11,6 @@
 
 # AIX make defaults to building *every* target of the first rule.  Start with
 # a single-target, empty rule to make the other targets non-default.
-all:
 
 all check install installdirs installcheck installcheck-parallel uninstall 
clean distclean maintainer-clean dist distcheck world check-world install-world 
installcheck-world installcheck-resgroup installcheck-resgroup-v2:
        @if [ ! -f GNUmakefile ] ; then \
diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto
index b7e255484c7..f00f329a208 100644
--- a/protos/yagpcc_metrics.proto
+++ b/protos/yagpcc_metrics.proto
@@ -27,9 +27,12 @@ message QueryInfo {
     PlanGenerator generator = 1;
     uint64 query_id = 2;
     uint64 plan_id = 3;
-    string queryText = 4;
-    string planText = 5;
-    SessionInfo sessionInfo = 6;
+    string query_text = 4;
+    string plan_text = 5;
+    string temlate_query_text = 6;
+    string temlate_plan_text = 7;
+    string userName = 8;
+    string databaseName = 9;
 }
 
 enum PlanGenerator
@@ -45,40 +48,17 @@ message GPMetrics {
     SpillInfo spill = 3;
 }
 
-message QueryInfoHeader {
-    int32 pid = 1;
-    GpId gpIdentity = 2;
-
-    int32 tmid = 3; /* A time identifier for a particular query. All records 
associated with the query will have the same tmid. */
-    int32 ssid = 4; /* The session id as shown by gp_session_id. All records 
associated with the query will have the same ssid */
-    int32 ccnt = 5; /* The command number within this session as shown by 
gp_command_count. All records associated with the query will have the same ccnt 
*/
-    int32 sliceid = 6; /* slice identificator, 0 means general info for the 
whole query */
+message QueryKey {
+    int32 tmid = 1; /* A time identifier for a particular query. All records 
associated with the query will have the same tmid. */
+    int32 ssid = 2; /* The session id as shown by gp_session_id. All records 
associated with the query will have the same ssid */
+    int32 ccnt = 3; /* The command number within this session as shown by 
gp_command_count. All records associated with the query will have the same ccnt 
*/
 }
 
-message GpId {
+message SegmentKey {
     int32              dbid = 1;               /* the dbid of this database */
     int32              segindex = 2;           /* content indicator: -1 for 
entry database,
                                                  * 0, ..., n-1 for segment 
database *
                                                  * a primary and its mirror 
have the same segIndex */
-    GpRole gp_role = 3;
-    GpRole gp_session_role = 4;
-}
-
-enum GpRole
-{
-    GP_ROLE_UNSPECIFIED = 0;
-    GP_ROLE_UTILITY = 1;               /* Operating as a simple database 
engine */
-    GP_ROLE_DISPATCH = 2;              /* Operating as the parallel query 
dispatcher */
-    GP_ROLE_EXECUTE = 3;               /* Operating as a parallel query 
executor */
-    GP_ROLE_UNDEFINED = 4;             /* Should never see this role in use */
-}
-
-message SessionInfo {
-    string sql = 1;
-    string userName = 2;
-    string databaseName = 3;
-    string resourceGroup = 4;
-    string applicationName = 5;
 }
 
 message SystemStat {
diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto
index 0bef72891ee..97c5691a6f5 100644
--- a/protos/yagpcc_set_service.proto
+++ b/protos/yagpcc_set_service.proto
@@ -27,19 +27,19 @@ enum MetricResponseStatusCode {
 }
 
 message SetQueryReq {
-    QueryStatus query_status = 1;
+    QueryStatus           query_status = 1;
     google.protobuf.Timestamp datetime = 2;
-
-    QueryInfoHeader header = 3;
-    QueryInfo query_info = 4;
-    GPMetrics query_metrics = 5;
-    repeated MetricPlan plan_tree = 6;
+    QueryKey                 query_key = 3;
+    QueryInfo               query_info = 4;
+    GPMetrics            query_metrics = 5;
+    repeated MetricPlan      plan_tree = 6;
 }
 
 message SetPlanNodeReq {
-    PlanNodeStatus node_status = 1;
+    PlanNodeStatus         node_status = 1;
     google.protobuf.Timestamp datetime = 2;
-    QueryInfoHeader header = 3;
-    GPMetrics node_metrics = 4;
-    MetricPlan plan_node = 5;
+    QueryKey                 query_key = 3;
+    SegmentKey             segment_key = 4;
+    GPMetrics             node_metrics = 5;
+    MetricPlan               plan_node = 6;
 }
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index b1815a22bf8..d8145b811a4 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -1,11 +1,13 @@
 #include "EventSender.h"
 #include "GrpcConnector.h"
+#include "ProcStats.h"
 #include "protos/yagpcc_set_service.pb.h"
 #include <ctime>
 
 extern "C"
 {
 #include "postgres.h"
+#include "access/hash.h"
 #include "utils/metrics_utils.h"
 #include "utils/elog.h"
 #include "executor/executor.h"
@@ -18,10 +20,13 @@ extern "C"
 
 #include "tcop/utility.h"
 #include "pg_stat_statements_ya_parser.h"
+
+void get_spill_info(int ssid, int ccid, int32_t* file_count, int64_t* 
total_bytes);
 }
 
 namespace
 {
+
 std::string* get_user_name()
 {
     const char *username = GetConfigOption("session_authorization", false, 
false);
@@ -36,26 +41,6 @@ std::string* get_db_name()
     return result;
 }
 
-std::string* get_rg_name()
-{
-    auto userId = GetUserId();
-    if (!OidIsValid(userId))
-        return nullptr;
-    auto groupId = GetResGroupIdForRole(userId);
-    if (!OidIsValid(groupId))
-        return nullptr;
-    char *rgname = GetResGroupNameForId(groupId);
-    if (rgname == nullptr)
-        return nullptr;
-    pfree(rgname);
-    return new std::string(rgname);
-}
-
-std::string* get_app_name()
-{
-    return application_name ? new std::string(application_name) : nullptr;
-}
-
 int get_cur_slice_id(QueryDesc *desc)
 {
     if (!desc->estate)
@@ -75,33 +60,22 @@ google::protobuf::Timestamp current_ts()
     return current_ts;
 }
 
-void set_header(yagpcc::QueryInfoHeader *header, QueryDesc *queryDesc)
+void set_query_key(yagpcc::QueryKey *key, QueryDesc *query_desc)
 {
-    header->set_pid(MyProcPid);
-    auto gpId = header->mutable_gpidentity();
-    gpId->set_dbid(GpIdentity.dbid);
-    gpId->set_segindex(GpIdentity.segindex);
-    gpId->set_gp_role(static_cast<yagpcc::GpRole>(Gp_role));
-    gpId->set_gp_session_role(static_cast<yagpcc::GpRole>(Gp_session_role));
-    header->set_ssid(gp_session_id);
-    header->set_ccnt(gp_command_count);
-    header->set_sliceid(get_cur_slice_id(queryDesc));
+    key->set_ccnt(gp_command_count);
+    key->set_ssid(gp_session_id);
     int32 tmid = 0;
     gpmon_gettmid(&tmid);
-    header->set_tmid(tmid);
+    key->set_tmid(tmid);
 }
 
-void set_session_info(yagpcc::SessionInfo *si, QueryDesc *queryDesc)
+void set_segment_key(yagpcc::SegmentKey *key, QueryDesc *query_desc)
 {
-    if (queryDesc->sourceText)
-        *si->mutable_sql() = std::string(queryDesc->sourceText);
-    si->set_allocated_applicationname(get_app_name());
-    si->set_allocated_databasename(get_db_name());
-    si->set_allocated_resourcegroup(get_rg_name());
-    si->set_allocated_username(get_user_name());
+    key->set_dbid(GpIdentity.dbid);
+    key->set_segindex(GpIdentity.segindex);
 }
 
-ExplainState get_explain_state(QueryDesc *queryDesc, bool costs)
+ExplainState get_explain_state(QueryDesc *query_desc, bool costs)
 {
     ExplainState es;
     ExplainInitState(&es);
@@ -109,74 +83,130 @@ ExplainState get_explain_state(QueryDesc *queryDesc, bool 
costs)
     es.verbose = true;
     es.format = EXPLAIN_FORMAT_TEXT;
     ExplainBeginOutput(&es);
-    ExplainPrintPlan(&es, queryDesc);
+    ExplainPrintPlan(&es, query_desc);
     ExplainEndOutput(&es);
     return es;
 }
 
-void set_plan_text(std::string *plan_text, QueryDesc *queryDesc)
+void set_plan_text(std::string *plan_text, QueryDesc *query_desc)
 {
-    auto es = get_explain_state(queryDesc, true);
+    auto es = get_explain_state(query_desc, true);
     *plan_text = std::string(es.str->data, es.str->len);
 }
 
-void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *queryDesc)
+void set_query_plan(yagpcc::QueryInfo *qi, QueryDesc *query_desc)
 {
-    set_session_info(qi->mutable_sessioninfo(), queryDesc);
-    if (queryDesc->sourceText)
-        *qi->mutable_querytext() = queryDesc->sourceText;
-    if (queryDesc->plannedstmt)
-    {
-        qi->set_generator(queryDesc->plannedstmt->planGen == PLANGEN_OPTIMIZER
+    qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER
                                 ? 
yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER
                                 : 
yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER);
-        if (queryDesc->planstate)
-        {
-            set_plan_text(qi->mutable_plantext(), queryDesc);
-            qi->set_plan_id(get_plan_id(queryDesc));
-        }
+    set_plan_text(qi->mutable_plan_text(), query_desc);
+    StringInfo norm_plan = gen_normplan(qi->plan_text().c_str());
+    *qi->mutable_temlate_plan_text() = std::string(norm_plan->data);
+    qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, 
norm_plan->len));
+    //TODO: free stringinfo?
+}
+
+void set_query_text(yagpcc::QueryInfo *qi, QueryDesc *query_desc)
+{
+    *qi->mutable_query_text() = query_desc->sourceText;
+    char* norm_query = gen_normquery(query_desc->sourceText);
+    *qi->mutable_temlate_query_text() = std::string(norm_query);
+    pfree(norm_query);
+}
+
+void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc)
+{
+    if (query_desc->sourceText)
+        set_query_text(qi, query_desc);
+    if (query_desc->plannedstmt)
+    {
+        set_query_plan(qi, query_desc);
+        qi->set_query_id(query_desc->plannedstmt->queryId);
     }
-    qi->set_query_id(queryDesc->plannedstmt->queryId);
+    qi->set_allocated_username(get_user_name());
+    qi->set_allocated_databasename(get_db_name());
+}
+
+void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics, 
QueryDesc *query_desc)
+{
+    auto instrument = query_desc->planstate->instrument;
+    metrics->set_ntuples(instrument->ntuples);
+    metrics->set_nloops(instrument->nloops);
+    metrics->set_tuplecount(instrument->tuplecount);
+    metrics->set_firsttuple(instrument->firsttuple);
+    metrics->set_startup(instrument->startup);
+    metrics->set_total(instrument->total);
+    auto &buffusage = instrument->bufusage;
+    metrics->set_shared_blks_hit(buffusage.shared_blks_hit);
+    metrics->set_shared_blks_read(buffusage.shared_blks_read);
+    metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied);
+    metrics->set_shared_blks_written(buffusage.shared_blks_written);
+    metrics->set_local_blks_hit(buffusage.local_blks_hit);
+    metrics->set_local_blks_read(buffusage.local_blks_read);
+    metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied);
+    metrics->set_local_blks_written(buffusage.local_blks_written);
+    metrics->set_temp_blks_read(buffusage.temp_blks_read);
+    metrics->set_temp_blks_written(buffusage.temp_blks_written);
+    metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time));
+    
metrics->set_blk_write_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time));
 }
+
+void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc)
+{
+    int32_t n_spill_files = 0;
+    int64_t n_spill_bytes = 0;
+    get_spill_info(gp_session_id, gp_command_count, &n_spill_files, 
&n_spill_bytes);
+    metrics->mutable_spill()->set_filecount(n_spill_files);
+    metrics->mutable_spill()->set_totalbytes(n_spill_bytes);
+    if (query_desc->planstate->instrument)
+        set_metric_instrumentation(metrics->mutable_instrumentation(), 
query_desc);
+    fill_self_stats(metrics->mutable_systemstat());
+}
+
+
 } // namespace
 
-void EventSender::ExecutorStart(QueryDesc *queryDesc, int /* eflags*/)
+void EventSender::ExecutorStart(QueryDesc *query_desc, int /* eflags*/)
 {
-    elog(DEBUG1, "Query %s start recording", queryDesc->sourceText);
+    query_desc->instrument_options |= INSTRUMENT_BUFFERS;
+    query_desc->instrument_options |= INSTRUMENT_ROWS;
+    query_desc->instrument_options |= INSTRUMENT_TIMER;
+
+    elog(DEBUG1, "Query %s start recording", query_desc->sourceText);
     yagpcc::SetQueryReq req;
     req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START);
     *req.mutable_datetime() = current_ts();
-    set_header(req.mutable_header(), queryDesc);
-    set_query_info(req.mutable_query_info(), queryDesc);
+    set_query_key(req.mutable_query_key(), query_desc);
     auto result = connector->set_metric_query(req);
     if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR)
     {
         elog(WARNING, "Query %s start reporting failed with an error %s",
-             queryDesc->sourceText, result.error_text().c_str());
+             query_desc->sourceText, result.error_text().c_str());
     }
     else
     {
-        elog(DEBUG1, "Query %s start successful", queryDesc->sourceText);
+        elog(DEBUG1, "Query %s start successful", query_desc->sourceText);
     }
 }
 
-void EventSender::ExecutorFinish(QueryDesc *queryDesc)
+void EventSender::ExecutorFinish(QueryDesc *query_desc)
 {
-    elog(DEBUG1, "Query %s finish recording", queryDesc->sourceText);
+    elog(DEBUG1, "Query %s finish recording", query_desc->sourceText);
     yagpcc::SetQueryReq req;
     req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE);
     *req.mutable_datetime() = current_ts();
-    set_header(req.mutable_header(), queryDesc);
-    set_query_info(req.mutable_query_info(), queryDesc);
+    set_query_key(req.mutable_query_key(), query_desc);
+    set_query_info(req.mutable_query_info(), query_desc);
+    set_gp_metrics(req.mutable_query_metrics(), query_desc);
     auto result = connector->set_metric_query(req);
     if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR)
     {
         elog(WARNING, "Query %s finish reporting failed with an error %s",
-             queryDesc->sourceText, result.error_text().c_str());
+             query_desc->sourceText, result.error_text().c_str());
     }
     else
     {
-        elog(DEBUG1, "Query %s finish successful", queryDesc->sourceText);
+        elog(DEBUG1, "Query %s finish successful", query_desc->sourceText);
     }
 }
 
diff --git a/src/EventSender.h b/src/EventSender.h
index 70868f6c757..bd02455ca7e 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -9,8 +9,8 @@ struct QueryDesc;
 class EventSender
 {
 public:
-    void ExecutorStart(QueryDesc *queryDesc, int eflags);
-    void ExecutorFinish(QueryDesc *queryDesc);
+    void ExecutorStart(QueryDesc *query_desc, int eflags);
+    void ExecutorFinish(QueryDesc *query_desc);
     static EventSender *instance();
 
 private:
diff --git a/src/ProcStats.cpp b/src/ProcStats.cpp
new file mode 100644
index 00000000000..34c5d05719e
--- /dev/null
+++ b/src/ProcStats.cpp
@@ -0,0 +1,119 @@
+#include "ProcStats.h"
+#include "yagpcc_metrics.pb.h"
+#include <string>
+#include <fstream>
+#include <unistd.h>
+
+extern "C"
+{
+#include "postgres.h"
+#include "utils/elog.h"
+}
+
+namespace {
+#define FILL_IO_STAT(stat_name)             \
+    uint64_t stat_name;                     \
+    proc_stat >> tmp >> stat_name;          \
+    stats->set_##stat_name(stat_name);
+
+void fill_io_stats(yagpcc::SystemStat *stats)
+{
+    std::ifstream proc_stat("/proc/self/io");
+    std::string tmp;
+    FILL_IO_STAT(rchar);
+    FILL_IO_STAT(wchar);
+    FILL_IO_STAT(syscr);
+    FILL_IO_STAT(syscw);
+    FILL_IO_STAT(read_bytes);
+    FILL_IO_STAT(write_bytes);
+    FILL_IO_STAT(cancelled_write_bytes);
+}
+
+void fill_cpu_stats(yagpcc::SystemStat *stats)
+{
+    static const int UTIME_ID = 13;
+    static const int STIME_ID = 14;
+    static const int STARTTIME_ID = 21;
+    static const int VSIZE_ID = 22;
+    static const int RSS_ID = 23;
+    static const double tps = sysconf(_SC_CLK_TCK);
+
+    double uptime;
+    {
+        std::ifstream proc_stat("/proc/uptime");
+        proc_stat >> uptime;
+    }
+
+    std::ifstream proc_stat("/proc/self/stat");
+    std::string trash;
+    double start_time = 0;
+    for (int i = 0; i <= RSS_ID; ++i)
+    {
+        switch (i)
+        {
+        case UTIME_ID:
+            double utime;
+            proc_stat >> utime;
+            stats->set_usertimeseconds(utime / tps);
+            break;
+        case STIME_ID:
+            double stime;
+            proc_stat >> stime;
+            stats->set_kerneltimeseconds(stime / tps);
+            break;
+        case STARTTIME_ID:
+            uint64_t starttime;
+            proc_stat >> starttime;
+            start_time = static_cast<double>(starttime) / tps;
+            break;
+        case VSIZE_ID:
+            uint64_t vsize;
+            proc_stat >> vsize;
+            stats->set_vsize(vsize);
+            break;
+        case RSS_ID:
+            uint64_t rss;
+            proc_stat >> rss;
+            // NOTE: this is a double AFAIU, need to double-check
+            stats->set_rss(rss);
+            break;
+        default:
+            proc_stat >> trash;
+        }
+        stats->set_runningtimeseconds(uptime - start_time);
+    }
+}
+
+void fill_status_stats(yagpcc::SystemStat *stats)
+{
+    std::ifstream proc_stat("/proc/self/status");
+    std::string key, measure;
+    while (proc_stat >> key)
+    {
+        if (key == "VmPeak:")
+        {
+            uint64_t value;
+            proc_stat >> value;
+            stats->set_vmpeakkb(value);
+            proc_stat >> measure;
+            if (measure != "kB")
+                elog(FATAL, "Expected memory sizes in kB, but got in %s", 
measure.c_str());
+        }
+        else if (key == "VmSize:")
+        {
+            uint64_t value;
+            proc_stat >> value;
+            stats->set_vmsizekb(value);
+            if (measure != "kB")
+                elog(FATAL, "Expected memory sizes in kB, but got in %s", 
measure.c_str());
+        }
+    }
+}
+} // namespace
+
+void fill_self_stats(yagpcc::SystemStat *stats)
+{
+    fill_io_stats(stats);
+    fill_cpu_stats(stats);
+    fill_status_stats(stats);
+}
\ No newline at end of file
diff --git a/src/ProcStats.h b/src/ProcStats.h
new file mode 100644
index 00000000000..30a90a60519
--- /dev/null
+++ b/src/ProcStats.h
@@ -0,0 +1,7 @@
+#pragma once
+
+namespace yagpcc {
+class SystemStat;
+}
+
+void fill_self_stats(yagpcc::SystemStat *stats);
\ No newline at end of file
diff --git a/src/SpillInfoWrapper.c b/src/SpillInfoWrapper.c
new file mode 100644
index 00000000000..c6ace0a693f
--- /dev/null
+++ b/src/SpillInfoWrapper.c
@@ -0,0 +1,21 @@
+#include "postgres.h"
+#include "utils/workfile_mgr.h"
+
+void get_spill_info(int ssid, int ccid, int32_t* file_count, int64_t* 
total_bytes);
+
+void get_spill_info(int ssid, int ccid, int32_t* file_count, int64_t* 
total_bytes)
+{
+    int count = 0;
+    int i = 0;
+    workfile_set *workfiles = workfile_mgr_cache_entries_get_copy(&count);
+    workfile_set *wf_iter = workfiles;
+    for (i = 0; i < count; ++i, ++wf_iter)
+    {
+        if (wf_iter->active && wf_iter->session_id == ssid && 
wf_iter->command_count == ccid)
+        {
+            *file_count += wf_iter->num_files;
+            *total_bytes += wf_iter->total_bytes;
+        }
+    }
+    pfree(workfiles);
+}
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index 9f3200c006f..1dabb59ab3f 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -19,8 +19,8 @@ extern "C"
 static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr;
 static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr;
 
-static void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags);
-static void ya_ExecutorFinish_hook(QueryDesc *queryDesc);
+static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags);
+static void ya_ExecutorFinish_hook(QueryDesc *query_desc);
 
 #define REPLACE_HOOK(hookName)      \
     previous_##hookName = hookName; \
@@ -56,12 +56,22 @@ void hooks_deinit()
     else                                                                   \
         standard_##hookName(__VA_ARGS__);
 
-void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags)
+void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags)
 {
-    CREATE_HOOK_WRAPPER(ExecutorStart, queryDesc, eflags);
+    CREATE_HOOK_WRAPPER(ExecutorStart, query_desc, eflags);
+    PG_TRY();
+    {
+        EventSender::instance()->ExecutorStart(query_desc, eflags);
+    }
+    PG_CATCH();
+    {
+        ereport(WARNING, (errmsg("EventSender failed in ExecutorStart 
afterhook")));
+        PG_RE_THROW();
+    }
+    PG_END_TRY();
 }
 
-void ya_ExecutorFinish_hook(QueryDesc *queryDesc)
+void ya_ExecutorFinish_hook(QueryDesc *query_desc)
 {
-    CREATE_HOOK_WRAPPER(ExecutorFinish, queryDesc);
+    CREATE_HOOK_WRAPPER(ExecutorFinish, query_desc);
 }
\ No newline at end of file
diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c 
b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
index f14742337bd..ae79e7dc40a 100644
--- a/src/stat_statements_parser/pg_stat_statements_ya_parser.c
+++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c
@@ -1,3 +1,6 @@
+// NOTE: this file is just a bunch of code borrowed from pg_stat_statements 
for PG 9.4
+// and from our own inhouse implementation of pg_stat_statements for managed PG
+
 #include "postgres.h"
 
 #include <sys/stat.h>
@@ -67,14 +70,15 @@ static void JumbleQuery(pgssJumbleState *jstate, Query 
*query);
 static void JumbleRangeTable(pgssJumbleState *jstate, List *rtable);
 static void JumbleExpr(pgssJumbleState *jstate, Node *node);
 static void RecordConstLocation(pgssJumbleState *jstate, int location);
-
-static StringInfo gen_normplan(const char *execution_plan);
-
+static void fill_in_constant_lengths(pgssJumbleState *jstate, const char 
*query);
+static int comp_location(const void *a, const void *b);
+StringInfo gen_normplan(const char *execution_plan);
 static bool need_replace(int token);
-
 void pgss_post_parse_analyze(ParseState *pstate, Query *query);
+static char *generate_normalized_query(pgssJumbleState *jstate, const char 
*query,
+                                                                          int 
*query_len_p, int encoding);
 
-void stat_statements_parser_init()
+       void stat_statements_parser_init()
 {
        prev_post_parse_analyze_hook = post_parse_analyze_hook;
        post_parse_analyze_hook = pgss_post_parse_analyze;
@@ -650,7 +654,7 @@ need_replace(int token)
  * gen_normplan - parse execution plan using flex and replace all CONST to
  * substitute variables.
  */
-static StringInfo
+StringInfo
 gen_normplan(const char *execution_plan)
 {
        core_yyscan_t yyscanner;
@@ -715,14 +719,6 @@ gen_normplan(const char *execution_plan)
        return plan_out;
 }
 
-uint64_t get_plan_id(QueryDesc *queryDesc)
-{
-       if (!queryDesc->sourceText)
-               return 0;
-       StringInfo normalized = gen_normplan(queryDesc->sourceText);
-       return hash_any((unsigned char *)normalized->data, normalized->len);
-}
-
 /*
  * Post-parse-analysis hook: mark query with a queryId
  */
@@ -768,4 +764,228 @@ void pgss_post_parse_analyze(ParseState *pstate, Query 
*query)
         */
        if (query->queryId == 0)
                query->queryId = 1;
+}
+
+/*
+ * comp_location: comparator for qsorting pgssLocationLen structs by location
+ */
+static int
+comp_location(const void *a, const void *b)
+{
+       int                     l = ((const pgssLocationLen *) a)->location;
+       int                     r = ((const pgssLocationLen *) b)->location;
+
+       if (l < r)
+               return -1;
+       else if (l > r)
+               return +1;
+       else
+               return 0;
+}
+
+/*
+ * Given a valid SQL string and an array of constant-location records,
+ * fill in the textual lengths of those constants.
+ *
+ * The constants may use any allowed constant syntax, such as float literals,
+ * bit-strings, single-quoted strings and dollar-quoted strings.  This is
+ * accomplished by using the public API for the core scanner.
+ *
+ * It is the caller's job to ensure that the string is a valid SQL statement
+ * with constants at the indicated locations.  Since in practice the string
+ * has already been parsed, and the locations that the caller provides will
+ * have originated from within the authoritative parser, this should not be
+ * a problem.
+ *
+ * Duplicate constant pointers are possible, and will have their lengths
+ * marked as '-1', so that they are later ignored.  (Actually, we assume the
+ * lengths were initialized as -1 to start with, and don't change them here.)
+ *
+ * N.B. There is an assumption that a '-' character at a Const location begins
+ * a negative numeric constant.  This precludes there ever being another
+ * reason for a constant to start with a '-'.
+ */
+static void
+fill_in_constant_lengths(pgssJumbleState *jstate, const char *query)
+{
+       pgssLocationLen *locs;
+       core_yyscan_t yyscanner;
+       core_yy_extra_type yyextra;
+       core_YYSTYPE yylval;
+       YYLTYPE         yylloc;
+       int                     last_loc = -1;
+       int                     i;
+
+       /*
+        * Sort the records by location so that we can process them in order 
while
+        * scanning the query text.
+        */
+       if (jstate->clocations_count > 1)
+               qsort(jstate->clocations, jstate->clocations_count,
+                         sizeof(pgssLocationLen), comp_location);
+       locs = jstate->clocations;
+
+       /* initialize the flex scanner --- should match raw_parser() */
+       yyscanner = scanner_init(query,
+                                                        &yyextra,
+                                                        ScanKeywords,
+                                                        NumScanKeywords);
+
+       /* Search for each constant, in sequence */
+       for (i = 0; i < jstate->clocations_count; i++)
+       {
+               int                     loc = locs[i].location;
+               int                     tok;
+
+               Assert(loc >= 0);
+
+               if (loc <= last_loc)
+                       continue;                       /* Duplicate constant, 
ignore */
+
+               /* Lex tokens until we find the desired constant */
+               for (;;)
+               {
+                       tok = core_yylex(&yylval, &yylloc, yyscanner);
+
+                       /* We should not hit end-of-string, but if we do, 
behave sanely */
+                       if (tok == 0)
+                               break;                  /* out of inner 
for-loop */
+
+                       /*
+                        * We should find the token position exactly, but if we 
somehow
+                        * run past it, work with that.
+                        */
+                       if (yylloc >= loc)
+                       {
+                               if (query[loc] == '-')
+                               {
+                                       /*
+                                        * It's a negative value - this is the 
one and only case
+                                        * where we replace more than a single 
token.
+                                        *
+                                        * Do not compensate for the core 
system's special-case
+                                        * adjustment of location to that of 
the leading '-'
+                                        * operator in the event of a negative 
constant.  It is
+                                        * also useful for our purposes to 
start from the minus
+                                        * symbol.  In this way, queries like 
"select * from foo
+                                        * where bar = 1" and "select * from 
foo where bar = -2"
+                                        * will have identical normalized query 
strings.
+                                        */
+                                       tok = core_yylex(&yylval, &yylloc, 
yyscanner);
+                                       if (tok == 0)
+                                               break;  /* out of inner 
for-loop */
+                               }
+
+                               /*
+                                * We now rely on the assumption that flex has 
placed a zero
+                                * byte after the text of the current token in 
scanbuf.
+                                */
+                               locs[i].length = strlen(yyextra.scanbuf + loc);
+                               break;                  /* out of inner 
for-loop */
+                       }
+               }
+
+               /* If we hit end-of-string, give up, leaving remaining lengths 
-1 */
+               if (tok == 0)
+                       break;
+
+               last_loc = loc;
+       }
+
+       scanner_finish(yyscanner);
+}
+
+/*
+ * Generate a normalized version of the query string that will be used to
+ * represent all similar queries.
+ *
+ * Note that the normalized representation may well vary depending on
+ * just which "equivalent" query is used to create the hashtable entry.
+ * We assume this is OK.
+ *
+ * *query_len_p contains the input string length, and is updated with
+ * the result string length (which cannot be longer) on exit.
+ *
+ * Returns a palloc'd string.
+ */
+static char *
+generate_normalized_query(pgssJumbleState *jstate, const char *query,
+                                                 int *query_len_p, int 
encoding)
+{
+       char       *norm_query;
+       int                     query_len = *query_len_p;
+       int                     i,
+                               len_to_wrt,             /* Length (in bytes) to 
write */
+                               quer_loc = 0,   /* Source query byte location */
+                               n_quer_loc = 0, /* Normalized query byte 
location */
+                               last_off = 0,   /* Offset from start for 
previous tok */
+                               last_tok_len = 0;               /* Length (in 
bytes) of that tok */
+
+       /*
+        * Get constants' lengths (core system only gives us locations).  Note
+        * this also ensures the items are sorted by location.
+        */
+       fill_in_constant_lengths(jstate, query);
+
+       /* Allocate result buffer */
+       norm_query = palloc(query_len + 1);
+
+       for (i = 0; i < jstate->clocations_count; i++)
+       {
+               int                     off,            /* Offset from start 
for cur tok */
+                                       tok_len;        /* Length (in bytes) of 
that tok */
+
+               off = jstate->clocations[i].location;
+               tok_len = jstate->clocations[i].length;
+
+               if (tok_len < 0)
+                       continue;                       /* ignore any 
duplicates */
+
+               /* Copy next chunk (what precedes the next constant) */
+               len_to_wrt = off - last_off;
+               len_to_wrt -= last_tok_len;
+
+               Assert(len_to_wrt >= 0);
+               memcpy(norm_query + n_quer_loc, query + quer_loc, len_to_wrt);
+               n_quer_loc += len_to_wrt;
+
+               /* And insert a '?' in place of the constant token */
+               norm_query[n_quer_loc++] = '?';
+
+               quer_loc = off + tok_len;
+               last_off = off;
+               last_tok_len = tok_len;
+       }
+
+       /*
+        * We've copied up until the last ignorable constant.  Copy over the
+        * remaining bytes of the original query string.
+        */
+       len_to_wrt = query_len - quer_loc;
+
+       Assert(len_to_wrt >= 0);
+       memcpy(norm_query + n_quer_loc, query + quer_loc, len_to_wrt);
+       n_quer_loc += len_to_wrt;
+
+       Assert(n_quer_loc <= query_len);
+       norm_query[n_quer_loc] = '\0';
+
+       *query_len_p = n_quer_loc;
+       return norm_query;
+}
+
+char *gen_normquery(const char *query)
+{
+       if (!query) {
+               return NULL;
+       }
+       pgssJumbleState jstate;
+       jstate.jumble = (unsigned char *)palloc(JUMBLE_SIZE);
+       jstate.jumble_len = 0;
+       jstate.clocations_buf_size = 32;
+       jstate.clocations = (pgssLocationLen *)
+               palloc(jstate.clocations_buf_size * sizeof(pgssLocationLen));
+       jstate.clocations_count = 0;
+       int query_len = strlen(query);
+       return generate_normalized_query(&jstate, query, &query_len, 
GetDatabaseEncoding());
 }
\ No newline at end of file
diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.h 
b/src/stat_statements_parser/pg_stat_statements_ya_parser.h
index 274f96aebaf..aa9cd217e31 100644
--- a/src/stat_statements_parser/pg_stat_statements_ya_parser.h
+++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.h
@@ -12,4 +12,5 @@ extern void stat_statements_parser_deinit(void);
 }
 #endif
 
-uint64_t get_plan_id(QueryDesc *queryDesc);
\ No newline at end of file
+StringInfo gen_normplan(const char *executionPlan);
+char *gen_normquery(const char *query);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to