This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit b3bf74265f47931ac910dbd053239f7cc1d03588 Author: Maxim Smyatkin <[email protected]> AuthorDate: Wed Mar 29 16:10:20 2023 +0300 [yagp_hooks_collector] Add executor instrumentation, /proc stats, and normalized texts Collect spill info (file count, bytes written). Generate normalized query and plan texts using a pg_stat_statements-derived parser. Collect buffer I/O counters, tuple counts, timing, and /proc/self CPU/memory/IO statistics. --- Makefile | 1 - protos/yagpcc_metrics.proto | 42 +--- protos/yagpcc_set_service.proto | 20 +- src/EventSender.cpp | 164 ++++++++------ src/EventSender.h | 4 +- src/ProcStats.cpp | 119 ++++++++++ src/ProcStats.h | 7 + src/SpillInfoWrapper.c | 21 ++ src/hook_wrappers.cpp | 22 +- .../pg_stat_statements_ya_parser.c | 248 +++++++++++++++++++-- .../pg_stat_statements_ya_parser.h | 3 +- 11 files changed, 519 insertions(+), 132 deletions(-) diff --git a/Makefile b/Makefile index 15c5dabb70e..0a21cf136ff 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,6 @@ # AIX make defaults to building *every* target of the first rule. Start with # a single-target, empty rule to make the other targets non-default. -all: all check install installdirs installcheck installcheck-parallel uninstall clean distclean maintainer-clean dist distcheck world check-world install-world installcheck-world installcheck-resgroup installcheck-resgroup-v2: @if [ ! -f GNUmakefile ] ; then \ diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto index b7e255484c7..f00f329a208 100644 --- a/protos/yagpcc_metrics.proto +++ b/protos/yagpcc_metrics.proto @@ -27,9 +27,12 @@ message QueryInfo { PlanGenerator generator = 1; uint64 query_id = 2; uint64 plan_id = 3; - string queryText = 4; - string planText = 5; - SessionInfo sessionInfo = 6; + string query_text = 4; + string plan_text = 5; + string temlate_query_text = 6; + string temlate_plan_text = 7; + string userName = 8; + string databaseName = 9; } enum PlanGenerator @@ -45,40 +48,17 @@ message GPMetrics { SpillInfo spill = 3; } -message QueryInfoHeader { - int32 pid = 1; - GpId gpIdentity = 2; - - int32 tmid = 3; /* A time identifier for a particular query. All records associated with the query will have the same tmid. */ - int32 ssid = 4; /* The session id as shown by gp_session_id. All records associated with the query will have the same ssid */ - int32 ccnt = 5; /* The command number within this session as shown by gp_command_count. All records associated with the query will have the same ccnt */ - int32 sliceid = 6; /* slice identificator, 0 means general info for the whole query */ +message QueryKey { + int32 tmid = 1; /* A time identifier for a particular query. All records associated with the query will have the same tmid. */ + int32 ssid = 2; /* The session id as shown by gp_session_id. All records associated with the query will have the same ssid */ + int32 ccnt = 3; /* The command number within this session as shown by gp_command_count. All records associated with the query will have the same ccnt */ } -message GpId { +message SegmentKey { int32 dbid = 1; /* the dbid of this database */ int32 segindex = 2; /* content indicator: -1 for entry database, * 0, ..., n-1 for segment database * * a primary and its mirror have the same segIndex */ - GpRole gp_role = 3; - GpRole gp_session_role = 4; -} - -enum GpRole -{ - GP_ROLE_UNSPECIFIED = 0; - GP_ROLE_UTILITY = 1; /* Operating as a simple database engine */ - GP_ROLE_DISPATCH = 2; /* Operating as the parallel query dispatcher */ - GP_ROLE_EXECUTE = 3; /* Operating as a parallel query executor */ - GP_ROLE_UNDEFINED = 4; /* Should never see this role in use */ -} - -message SessionInfo { - string sql = 1; - string userName = 2; - string databaseName = 3; - string resourceGroup = 4; - string applicationName = 5; } message SystemStat { diff --git a/protos/yagpcc_set_service.proto b/protos/yagpcc_set_service.proto index 0bef72891ee..97c5691a6f5 100644 --- a/protos/yagpcc_set_service.proto +++ b/protos/yagpcc_set_service.proto @@ -27,19 +27,19 @@ enum MetricResponseStatusCode { } message SetQueryReq { - QueryStatus query_status = 1; + QueryStatus query_status = 1; google.protobuf.Timestamp datetime = 2; - - QueryInfoHeader header = 3; - QueryInfo query_info = 4; - GPMetrics query_metrics = 5; - repeated MetricPlan plan_tree = 6; + QueryKey query_key = 3; + QueryInfo query_info = 4; + GPMetrics query_metrics = 5; + repeated MetricPlan plan_tree = 6; } message SetPlanNodeReq { - PlanNodeStatus node_status = 1; + PlanNodeStatus node_status = 1; google.protobuf.Timestamp datetime = 2; - QueryInfoHeader header = 3; - GPMetrics node_metrics = 4; - MetricPlan plan_node = 5; + QueryKey query_key = 3; + SegmentKey segment_key = 4; + GPMetrics node_metrics = 5; + MetricPlan plan_node = 6; } diff --git a/src/EventSender.cpp b/src/EventSender.cpp index b1815a22bf8..d8145b811a4 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -1,11 +1,13 @@ #include "EventSender.h" #include "GrpcConnector.h" +#include "ProcStats.h" #include "protos/yagpcc_set_service.pb.h" #include <ctime> extern "C" { #include "postgres.h" +#include "access/hash.h" #include "utils/metrics_utils.h" #include "utils/elog.h" #include "executor/executor.h" @@ -18,10 +20,13 @@ extern "C" #include "tcop/utility.h" #include "pg_stat_statements_ya_parser.h" + +void get_spill_info(int ssid, int ccid, int32_t* file_count, int64_t* total_bytes); } namespace { + std::string* get_user_name() { const char *username = GetConfigOption("session_authorization", false, false); @@ -36,26 +41,6 @@ std::string* get_db_name() return result; } -std::string* get_rg_name() -{ - auto userId = GetUserId(); - if (!OidIsValid(userId)) - return nullptr; - auto groupId = GetResGroupIdForRole(userId); - if (!OidIsValid(groupId)) - return nullptr; - char *rgname = GetResGroupNameForId(groupId); - if (rgname == nullptr) - return nullptr; - pfree(rgname); - return new std::string(rgname); -} - -std::string* get_app_name() -{ - return application_name ? new std::string(application_name) : nullptr; -} - int get_cur_slice_id(QueryDesc *desc) { if (!desc->estate) @@ -75,33 +60,22 @@ google::protobuf::Timestamp current_ts() return current_ts; } -void set_header(yagpcc::QueryInfoHeader *header, QueryDesc *queryDesc) +void set_query_key(yagpcc::QueryKey *key, QueryDesc *query_desc) { - header->set_pid(MyProcPid); - auto gpId = header->mutable_gpidentity(); - gpId->set_dbid(GpIdentity.dbid); - gpId->set_segindex(GpIdentity.segindex); - gpId->set_gp_role(static_cast<yagpcc::GpRole>(Gp_role)); - gpId->set_gp_session_role(static_cast<yagpcc::GpRole>(Gp_session_role)); - header->set_ssid(gp_session_id); - header->set_ccnt(gp_command_count); - header->set_sliceid(get_cur_slice_id(queryDesc)); + key->set_ccnt(gp_command_count); + key->set_ssid(gp_session_id); int32 tmid = 0; gpmon_gettmid(&tmid); - header->set_tmid(tmid); + key->set_tmid(tmid); } -void set_session_info(yagpcc::SessionInfo *si, QueryDesc *queryDesc) +void set_segment_key(yagpcc::SegmentKey *key, QueryDesc *query_desc) { - if (queryDesc->sourceText) - *si->mutable_sql() = std::string(queryDesc->sourceText); - si->set_allocated_applicationname(get_app_name()); - si->set_allocated_databasename(get_db_name()); - si->set_allocated_resourcegroup(get_rg_name()); - si->set_allocated_username(get_user_name()); + key->set_dbid(GpIdentity.dbid); + key->set_segindex(GpIdentity.segindex); } -ExplainState get_explain_state(QueryDesc *queryDesc, bool costs) +ExplainState get_explain_state(QueryDesc *query_desc, bool costs) { ExplainState es; ExplainInitState(&es); @@ -109,74 +83,130 @@ ExplainState get_explain_state(QueryDesc *queryDesc, bool costs) es.verbose = true; es.format = EXPLAIN_FORMAT_TEXT; ExplainBeginOutput(&es); - ExplainPrintPlan(&es, queryDesc); + ExplainPrintPlan(&es, query_desc); ExplainEndOutput(&es); return es; } -void set_plan_text(std::string *plan_text, QueryDesc *queryDesc) +void set_plan_text(std::string *plan_text, QueryDesc *query_desc) { - auto es = get_explain_state(queryDesc, true); + auto es = get_explain_state(query_desc, true); *plan_text = std::string(es.str->data, es.str->len); } -void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *queryDesc) +void set_query_plan(yagpcc::QueryInfo *qi, QueryDesc *query_desc) { - set_session_info(qi->mutable_sessioninfo(), queryDesc); - if (queryDesc->sourceText) - *qi->mutable_querytext() = queryDesc->sourceText; - if (queryDesc->plannedstmt) - { - qi->set_generator(queryDesc->plannedstmt->planGen == PLANGEN_OPTIMIZER + qi->set_generator(query_desc->plannedstmt->planGen == PLANGEN_OPTIMIZER ? yagpcc::PlanGenerator::PLAN_GENERATOR_OPTIMIZER : yagpcc::PlanGenerator::PLAN_GENERATOR_PLANNER); - if (queryDesc->planstate) - { - set_plan_text(qi->mutable_plantext(), queryDesc); - qi->set_plan_id(get_plan_id(queryDesc)); - } + set_plan_text(qi->mutable_plan_text(), query_desc); + StringInfo norm_plan = gen_normplan(qi->plan_text().c_str()); + *qi->mutable_temlate_plan_text() = std::string(norm_plan->data); + qi->set_plan_id(hash_any((unsigned char *)norm_plan->data, norm_plan->len)); + //TODO: free stringinfo? +} + +void set_query_text(yagpcc::QueryInfo *qi, QueryDesc *query_desc) +{ + *qi->mutable_query_text() = query_desc->sourceText; + char* norm_query = gen_normquery(query_desc->sourceText); + *qi->mutable_temlate_query_text() = std::string(norm_query); + pfree(norm_query); +} + +void set_query_info(yagpcc::QueryInfo *qi, QueryDesc *query_desc) +{ + if (query_desc->sourceText) + set_query_text(qi, query_desc); + if (query_desc->plannedstmt) + { + set_query_plan(qi, query_desc); + qi->set_query_id(query_desc->plannedstmt->queryId); } - qi->set_query_id(queryDesc->plannedstmt->queryId); + qi->set_allocated_username(get_user_name()); + qi->set_allocated_databasename(get_db_name()); +} + +void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics, QueryDesc *query_desc) +{ + auto instrument = query_desc->planstate->instrument; + metrics->set_ntuples(instrument->ntuples); + metrics->set_nloops(instrument->nloops); + metrics->set_tuplecount(instrument->tuplecount); + metrics->set_firsttuple(instrument->firsttuple); + metrics->set_startup(instrument->startup); + metrics->set_total(instrument->total); + auto &buffusage = instrument->bufusage; + metrics->set_shared_blks_hit(buffusage.shared_blks_hit); + metrics->set_shared_blks_read(buffusage.shared_blks_read); + metrics->set_shared_blks_dirtied(buffusage.shared_blks_dirtied); + metrics->set_shared_blks_written(buffusage.shared_blks_written); + metrics->set_local_blks_hit(buffusage.local_blks_hit); + metrics->set_local_blks_read(buffusage.local_blks_read); + metrics->set_local_blks_dirtied(buffusage.local_blks_dirtied); + metrics->set_local_blks_written(buffusage.local_blks_written); + metrics->set_temp_blks_read(buffusage.temp_blks_read); + metrics->set_temp_blks_written(buffusage.temp_blks_written); + metrics->set_blk_read_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_read_time)); + metrics->set_blk_write_time(INSTR_TIME_GET_DOUBLE(buffusage.blk_write_time)); } + +void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc) +{ + int32_t n_spill_files = 0; + int64_t n_spill_bytes = 0; + get_spill_info(gp_session_id, gp_command_count, &n_spill_files, &n_spill_bytes); + metrics->mutable_spill()->set_filecount(n_spill_files); + metrics->mutable_spill()->set_totalbytes(n_spill_bytes); + if (query_desc->planstate->instrument) + set_metric_instrumentation(metrics->mutable_instrumentation(), query_desc); + fill_self_stats(metrics->mutable_systemstat()); +} + + } // namespace -void EventSender::ExecutorStart(QueryDesc *queryDesc, int /* eflags*/) +void EventSender::ExecutorStart(QueryDesc *query_desc, int /* eflags*/) { - elog(DEBUG1, "Query %s start recording", queryDesc->sourceText); + query_desc->instrument_options |= INSTRUMENT_BUFFERS; + query_desc->instrument_options |= INSTRUMENT_ROWS; + query_desc->instrument_options |= INSTRUMENT_TIMER; + + elog(DEBUG1, "Query %s start recording", query_desc->sourceText); yagpcc::SetQueryReq req; req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_START); *req.mutable_datetime() = current_ts(); - set_header(req.mutable_header(), queryDesc); - set_query_info(req.mutable_query_info(), queryDesc); + set_query_key(req.mutable_query_key(), query_desc); auto result = connector->set_metric_query(req); if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) { elog(WARNING, "Query %s start reporting failed with an error %s", - queryDesc->sourceText, result.error_text().c_str()); + query_desc->sourceText, result.error_text().c_str()); } else { - elog(DEBUG1, "Query %s start successful", queryDesc->sourceText); + elog(DEBUG1, "Query %s start successful", query_desc->sourceText); } } -void EventSender::ExecutorFinish(QueryDesc *queryDesc) +void EventSender::ExecutorFinish(QueryDesc *query_desc) { - elog(DEBUG1, "Query %s finish recording", queryDesc->sourceText); + elog(DEBUG1, "Query %s finish recording", query_desc->sourceText); yagpcc::SetQueryReq req; req.set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE); *req.mutable_datetime() = current_ts(); - set_header(req.mutable_header(), queryDesc); - set_query_info(req.mutable_query_info(), queryDesc); + set_query_key(req.mutable_query_key(), query_desc); + set_query_info(req.mutable_query_info(), query_desc); + set_gp_metrics(req.mutable_query_metrics(), query_desc); auto result = connector->set_metric_query(req); if (result.error_code() == yagpcc::METRIC_RESPONSE_STATUS_CODE_ERROR) { elog(WARNING, "Query %s finish reporting failed with an error %s", - queryDesc->sourceText, result.error_text().c_str()); + query_desc->sourceText, result.error_text().c_str()); } else { - elog(DEBUG1, "Query %s finish successful", queryDesc->sourceText); + elog(DEBUG1, "Query %s finish successful", query_desc->sourceText); } } diff --git a/src/EventSender.h b/src/EventSender.h index 70868f6c757..bd02455ca7e 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -9,8 +9,8 @@ struct QueryDesc; class EventSender { public: - void ExecutorStart(QueryDesc *queryDesc, int eflags); - void ExecutorFinish(QueryDesc *queryDesc); + void ExecutorStart(QueryDesc *query_desc, int eflags); + void ExecutorFinish(QueryDesc *query_desc); static EventSender *instance(); private: diff --git a/src/ProcStats.cpp b/src/ProcStats.cpp new file mode 100644 index 00000000000..34c5d05719e --- /dev/null +++ b/src/ProcStats.cpp @@ -0,0 +1,119 @@ +#include "ProcStats.h" +#include "yagpcc_metrics.pb.h" +#include <string> +#include <fstream> +#include <unistd.h> + +extern "C" +{ +#include "postgres.h" +#include "utils/elog.h" +} + +namespace { +#define FILL_IO_STAT(stat_name) \ + uint64_t stat_name; \ + proc_stat >> tmp >> stat_name; \ + stats->set_##stat_name(stat_name); + +void fill_io_stats(yagpcc::SystemStat *stats) +{ + std::ifstream proc_stat("/proc/self/io"); + std::string tmp; + FILL_IO_STAT(rchar); + FILL_IO_STAT(wchar); + FILL_IO_STAT(syscr); + FILL_IO_STAT(syscw); + FILL_IO_STAT(read_bytes); + FILL_IO_STAT(write_bytes); + FILL_IO_STAT(cancelled_write_bytes); +} + +void fill_cpu_stats(yagpcc::SystemStat *stats) +{ + static const int UTIME_ID = 13; + static const int STIME_ID = 14; + static const int STARTTIME_ID = 21; + static const int VSIZE_ID = 22; + static const int RSS_ID = 23; + static const double tps = sysconf(_SC_CLK_TCK); + + double uptime; + { + std::ifstream proc_stat("/proc/uptime"); + proc_stat >> uptime; + } + + std::ifstream proc_stat("/proc/self/stat"); + std::string trash; + double start_time = 0; + for (int i = 0; i <= RSS_ID; ++i) + { + switch (i) + { + case UTIME_ID: + double utime; + proc_stat >> utime; + stats->set_usertimeseconds(utime / tps); + break; + case STIME_ID: + double stime; + proc_stat >> stime; + stats->set_kerneltimeseconds(stime / tps); + break; + case STARTTIME_ID: + uint64_t starttime; + proc_stat >> starttime; + start_time = static_cast<double>(starttime) / tps; + break; + case VSIZE_ID: + uint64_t vsize; + proc_stat >> vsize; + stats->set_vsize(vsize); + break; + case RSS_ID: + uint64_t rss; + proc_stat >> rss; + // NOTE: this is a double AFAIU, need to double-check + stats->set_rss(rss); + break; + default: + proc_stat >> trash; + } + stats->set_runningtimeseconds(uptime - start_time); + } +} + +void fill_status_stats(yagpcc::SystemStat *stats) +{ + std::ifstream proc_stat("/proc/self/status"); + std::string key, measure; + while (proc_stat >> key) + { + if (key == "VmPeak:") + { + uint64_t value; + proc_stat >> value; + stats->set_vmpeakkb(value); + proc_stat >> measure; + if (measure != "kB") + elog(FATAL, "Expected memory sizes in kB, but got in %s", measure.c_str()); + } + else if (key == "VmSize:") + { + uint64_t value; + proc_stat >> value; + stats->set_vmsizekb(value); + if (measure != "kB") + elog(FATAL, "Expected memory sizes in kB, but got in %s", measure.c_str()); + } + } +} +} // namespace + +void fill_self_stats(yagpcc::SystemStat *stats) +{ + fill_io_stats(stats); + fill_cpu_stats(stats); + fill_status_stats(stats); +} \ No newline at end of file diff --git a/src/ProcStats.h b/src/ProcStats.h new file mode 100644 index 00000000000..30a90a60519 --- /dev/null +++ b/src/ProcStats.h @@ -0,0 +1,7 @@ +#pragma once + +namespace yagpcc { +class SystemStat; +} + +void fill_self_stats(yagpcc::SystemStat *stats); \ No newline at end of file diff --git a/src/SpillInfoWrapper.c b/src/SpillInfoWrapper.c new file mode 100644 index 00000000000..c6ace0a693f --- /dev/null +++ b/src/SpillInfoWrapper.c @@ -0,0 +1,21 @@ +#include "postgres.h" +#include "utils/workfile_mgr.h" + +void get_spill_info(int ssid, int ccid, int32_t* file_count, int64_t* total_bytes); + +void get_spill_info(int ssid, int ccid, int32_t* file_count, int64_t* total_bytes) +{ + int count = 0; + int i = 0; + workfile_set *workfiles = workfile_mgr_cache_entries_get_copy(&count); + workfile_set *wf_iter = workfiles; + for (i = 0; i < count; ++i, ++wf_iter) + { + if (wf_iter->active && wf_iter->session_id == ssid && wf_iter->command_count == ccid) + { + *file_count += wf_iter->num_files; + *total_bytes += wf_iter->total_bytes; + } + } + pfree(workfiles); +} \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index 9f3200c006f..1dabb59ab3f 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -19,8 +19,8 @@ extern "C" static ExecutorStart_hook_type previous_ExecutorStart_hook = nullptr; static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr; -static void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags); -static void ya_ExecutorFinish_hook(QueryDesc *queryDesc); +static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags); +static void ya_ExecutorFinish_hook(QueryDesc *query_desc); #define REPLACE_HOOK(hookName) \ previous_##hookName = hookName; \ @@ -56,12 +56,22 @@ void hooks_deinit() else \ standard_##hookName(__VA_ARGS__); -void ya_ExecutorStart_hook(QueryDesc *queryDesc, int eflags) +void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { - CREATE_HOOK_WRAPPER(ExecutorStart, queryDesc, eflags); + CREATE_HOOK_WRAPPER(ExecutorStart, query_desc, eflags); + PG_TRY(); + { + EventSender::instance()->ExecutorStart(query_desc, eflags); + } + PG_CATCH(); + { + ereport(WARNING, (errmsg("EventSender failed in ExecutorStart afterhook"))); + PG_RE_THROW(); + } + PG_END_TRY(); } -void ya_ExecutorFinish_hook(QueryDesc *queryDesc) +void ya_ExecutorFinish_hook(QueryDesc *query_desc) { - CREATE_HOOK_WRAPPER(ExecutorFinish, queryDesc); + CREATE_HOOK_WRAPPER(ExecutorFinish, query_desc); } \ No newline at end of file diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.c b/src/stat_statements_parser/pg_stat_statements_ya_parser.c index f14742337bd..ae79e7dc40a 100644 --- a/src/stat_statements_parser/pg_stat_statements_ya_parser.c +++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.c @@ -1,3 +1,6 @@ +// NOTE: this file is just a bunch of code borrowed from pg_stat_statements for PG 9.4 +// and from our own inhouse implementation of pg_stat_statements for managed PG + #include "postgres.h" #include <sys/stat.h> @@ -67,14 +70,15 @@ static void JumbleQuery(pgssJumbleState *jstate, Query *query); static void JumbleRangeTable(pgssJumbleState *jstate, List *rtable); static void JumbleExpr(pgssJumbleState *jstate, Node *node); static void RecordConstLocation(pgssJumbleState *jstate, int location); - -static StringInfo gen_normplan(const char *execution_plan); - +static void fill_in_constant_lengths(pgssJumbleState *jstate, const char *query); +static int comp_location(const void *a, const void *b); +StringInfo gen_normplan(const char *execution_plan); static bool need_replace(int token); - void pgss_post_parse_analyze(ParseState *pstate, Query *query); +static char *generate_normalized_query(pgssJumbleState *jstate, const char *query, + int *query_len_p, int encoding); -void stat_statements_parser_init() + void stat_statements_parser_init() { prev_post_parse_analyze_hook = post_parse_analyze_hook; post_parse_analyze_hook = pgss_post_parse_analyze; @@ -650,7 +654,7 @@ need_replace(int token) * gen_normplan - parse execution plan using flex and replace all CONST to * substitute variables. */ -static StringInfo +StringInfo gen_normplan(const char *execution_plan) { core_yyscan_t yyscanner; @@ -715,14 +719,6 @@ gen_normplan(const char *execution_plan) return plan_out; } -uint64_t get_plan_id(QueryDesc *queryDesc) -{ - if (!queryDesc->sourceText) - return 0; - StringInfo normalized = gen_normplan(queryDesc->sourceText); - return hash_any((unsigned char *)normalized->data, normalized->len); -} - /* * Post-parse-analysis hook: mark query with a queryId */ @@ -768,4 +764,228 @@ void pgss_post_parse_analyze(ParseState *pstate, Query *query) */ if (query->queryId == 0) query->queryId = 1; +} + +/* + * comp_location: comparator for qsorting pgssLocationLen structs by location + */ +static int +comp_location(const void *a, const void *b) +{ + int l = ((const pgssLocationLen *) a)->location; + int r = ((const pgssLocationLen *) b)->location; + + if (l < r) + return -1; + else if (l > r) + return +1; + else + return 0; +} + +/* + * Given a valid SQL string and an array of constant-location records, + * fill in the textual lengths of those constants. + * + * The constants may use any allowed constant syntax, such as float literals, + * bit-strings, single-quoted strings and dollar-quoted strings. This is + * accomplished by using the public API for the core scanner. + * + * It is the caller's job to ensure that the string is a valid SQL statement + * with constants at the indicated locations. Since in practice the string + * has already been parsed, and the locations that the caller provides will + * have originated from within the authoritative parser, this should not be + * a problem. + * + * Duplicate constant pointers are possible, and will have their lengths + * marked as '-1', so that they are later ignored. (Actually, we assume the + * lengths were initialized as -1 to start with, and don't change them here.) + * + * N.B. There is an assumption that a '-' character at a Const location begins + * a negative numeric constant. This precludes there ever being another + * reason for a constant to start with a '-'. + */ +static void +fill_in_constant_lengths(pgssJumbleState *jstate, const char *query) +{ + pgssLocationLen *locs; + core_yyscan_t yyscanner; + core_yy_extra_type yyextra; + core_YYSTYPE yylval; + YYLTYPE yylloc; + int last_loc = -1; + int i; + + /* + * Sort the records by location so that we can process them in order while + * scanning the query text. + */ + if (jstate->clocations_count > 1) + qsort(jstate->clocations, jstate->clocations_count, + sizeof(pgssLocationLen), comp_location); + locs = jstate->clocations; + + /* initialize the flex scanner --- should match raw_parser() */ + yyscanner = scanner_init(query, + &yyextra, + ScanKeywords, + NumScanKeywords); + + /* Search for each constant, in sequence */ + for (i = 0; i < jstate->clocations_count; i++) + { + int loc = locs[i].location; + int tok; + + Assert(loc >= 0); + + if (loc <= last_loc) + continue; /* Duplicate constant, ignore */ + + /* Lex tokens until we find the desired constant */ + for (;;) + { + tok = core_yylex(&yylval, &yylloc, yyscanner); + + /* We should not hit end-of-string, but if we do, behave sanely */ + if (tok == 0) + break; /* out of inner for-loop */ + + /* + * We should find the token position exactly, but if we somehow + * run past it, work with that. + */ + if (yylloc >= loc) + { + if (query[loc] == '-') + { + /* + * It's a negative value - this is the one and only case + * where we replace more than a single token. + * + * Do not compensate for the core system's special-case + * adjustment of location to that of the leading '-' + * operator in the event of a negative constant. It is + * also useful for our purposes to start from the minus + * symbol. In this way, queries like "select * from foo + * where bar = 1" and "select * from foo where bar = -2" + * will have identical normalized query strings. + */ + tok = core_yylex(&yylval, &yylloc, yyscanner); + if (tok == 0) + break; /* out of inner for-loop */ + } + + /* + * We now rely on the assumption that flex has placed a zero + * byte after the text of the current token in scanbuf. + */ + locs[i].length = strlen(yyextra.scanbuf + loc); + break; /* out of inner for-loop */ + } + } + + /* If we hit end-of-string, give up, leaving remaining lengths -1 */ + if (tok == 0) + break; + + last_loc = loc; + } + + scanner_finish(yyscanner); +} + +/* + * Generate a normalized version of the query string that will be used to + * represent all similar queries. + * + * Note that the normalized representation may well vary depending on + * just which "equivalent" query is used to create the hashtable entry. + * We assume this is OK. + * + * *query_len_p contains the input string length, and is updated with + * the result string length (which cannot be longer) on exit. + * + * Returns a palloc'd string. + */ +static char * +generate_normalized_query(pgssJumbleState *jstate, const char *query, + int *query_len_p, int encoding) +{ + char *norm_query; + int query_len = *query_len_p; + int i, + len_to_wrt, /* Length (in bytes) to write */ + quer_loc = 0, /* Source query byte location */ + n_quer_loc = 0, /* Normalized query byte location */ + last_off = 0, /* Offset from start for previous tok */ + last_tok_len = 0; /* Length (in bytes) of that tok */ + + /* + * Get constants' lengths (core system only gives us locations). Note + * this also ensures the items are sorted by location. + */ + fill_in_constant_lengths(jstate, query); + + /* Allocate result buffer */ + norm_query = palloc(query_len + 1); + + for (i = 0; i < jstate->clocations_count; i++) + { + int off, /* Offset from start for cur tok */ + tok_len; /* Length (in bytes) of that tok */ + + off = jstate->clocations[i].location; + tok_len = jstate->clocations[i].length; + + if (tok_len < 0) + continue; /* ignore any duplicates */ + + /* Copy next chunk (what precedes the next constant) */ + len_to_wrt = off - last_off; + len_to_wrt -= last_tok_len; + + Assert(len_to_wrt >= 0); + memcpy(norm_query + n_quer_loc, query + quer_loc, len_to_wrt); + n_quer_loc += len_to_wrt; + + /* And insert a '?' in place of the constant token */ + norm_query[n_quer_loc++] = '?'; + + quer_loc = off + tok_len; + last_off = off; + last_tok_len = tok_len; + } + + /* + * We've copied up until the last ignorable constant. Copy over the + * remaining bytes of the original query string. + */ + len_to_wrt = query_len - quer_loc; + + Assert(len_to_wrt >= 0); + memcpy(norm_query + n_quer_loc, query + quer_loc, len_to_wrt); + n_quer_loc += len_to_wrt; + + Assert(n_quer_loc <= query_len); + norm_query[n_quer_loc] = '\0'; + + *query_len_p = n_quer_loc; + return norm_query; +} + +char *gen_normquery(const char *query) +{ + if (!query) { + return NULL; + } + pgssJumbleState jstate; + jstate.jumble = (unsigned char *)palloc(JUMBLE_SIZE); + jstate.jumble_len = 0; + jstate.clocations_buf_size = 32; + jstate.clocations = (pgssLocationLen *) + palloc(jstate.clocations_buf_size * sizeof(pgssLocationLen)); + jstate.clocations_count = 0; + int query_len = strlen(query); + return generate_normalized_query(&jstate, query, &query_len, GetDatabaseEncoding()); } \ No newline at end of file diff --git a/src/stat_statements_parser/pg_stat_statements_ya_parser.h b/src/stat_statements_parser/pg_stat_statements_ya_parser.h index 274f96aebaf..aa9cd217e31 100644 --- a/src/stat_statements_parser/pg_stat_statements_ya_parser.h +++ b/src/stat_statements_parser/pg_stat_statements_ya_parser.h @@ -12,4 +12,5 @@ extern void stat_statements_parser_deinit(void); } #endif -uint64_t get_plan_id(QueryDesc *queryDesc); \ No newline at end of file +StringInfo gen_normplan(const char *executionPlan); +char *gen_normquery(const char *query); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
