This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit 02390adcd86f76fa157a76183ac7e343ea170294 Author: Maxim Smyatkin <[email protected]> AuthorDate: Fri Apr 18 14:58:52 2025 +0300 [yagp_hooks_collector] Add per-slice interconnect statistics Hook into ic_teardown to collect UDP-IFC packet-level counters. Compile-time gated behind IC_TEARDOWN_HOOK. --- protos/yagpcc_metrics.proto | 56 +++++++++++++++++++++++++++++++++++++++++++++ src/EventSender.cpp | 53 +++++++++++++++++++++++++++++++++++++++++- src/EventSender.h | 10 ++++++++ src/ProtoUtils.cpp | 35 ++++++++++++++++++++++++++++ src/ProtoUtils.h | 3 +++ src/hook_wrappers.cpp | 24 +++++++++++++++++++ 6 files changed, 180 insertions(+), 1 deletion(-) diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto index fc85386c6b0..086f3e63379 100644 --- a/protos/yagpcc_metrics.proto +++ b/protos/yagpcc_metrics.proto @@ -42,6 +42,11 @@ message AdditionalQueryInfo { int64 slice_id = 3; } +message AdditionalQueryStat { + string error_message = 1; + repeated int64 slices = 2; +} + enum PlanGenerator { PLAN_GENERATOR_UNSPECIFIED = 0; @@ -96,6 +101,56 @@ message NetworkStat { uint32 chunks = 3; } +message InterconnectStat { + // Receive queue size sum when main thread is trying to get a packet + uint64 total_recv_queue_size = 1; + // Counting times when computing total_recv_queue_size + uint64 recv_queue_size_counting_time = 2; + + // The capacity sum when packets are tried to be sent + uint64 total_capacity = 3; + // Counting times used to compute total_capacity + uint64 capacity_counting_time = 4; + + // Total buffers available when sending packets + uint64 total_buffers = 5; + // Counting times when compute total_buffers + uint64 buffer_counting_time = 6; + + // The number of active connections + uint64 active_connections_num = 7; + + // The number of packet retransmits + int64 retransmits = 8; + + // The number of cached future packets + int64 startup_cached_pkt_num = 9; + + // The number of mismatched packets received + int64 mismatch_num = 10; + + // The number of crc errors + int64 crc_errors = 11; + + // The number of packets sent by sender + int64 snd_pkt_num = 12; + + // The number of packets received by receiver + int64 recv_pkt_num = 13; + + // Disordered packet number + int64 disordered_pkt_num = 14; + + // Duplicate packet number + int64 duplicated_pkt_num = 15; + + // The number of Acks received + int64 recv_ack_num = 16; + + // The number of status query messages sent + int64 status_query_msg_num = 17; +} + message MetricInstrumentation { uint64 ntuples = 1; /* Total tuples produced */ uint64 nloops = 2; /* # of run cycles for this node */ @@ -120,6 +175,7 @@ message MetricInstrumentation { double startup_time = 21; /* real query startup time (planning + queue time) */ uint64 inherited_calls = 22; /* the number of executed sub-queries */ double inherited_time = 23; /* total time spend on inherited execution */ + InterconnectStat interconnect = 24; } message SpillInfo { diff --git a/src/EventSender.cpp b/src/EventSender.cpp index cdb21ef7aa6..2ba34d1e4cc 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -1,6 +1,7 @@ #include "Config.h" #include "UDSConnector.h" +#define typeid __typeid extern "C" { #include "postgres.h" @@ -11,7 +12,9 @@ extern "C" { #include "cdb/cdbdisp.h" #include "cdb/cdbexplain.h" #include "cdb/cdbvars.h" +#include "cdb/ml_ipc.h" } +#undef typeid #include "EventSender.h" #include "PgUtils.h" @@ -35,7 +38,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { // no-op: executor_after_start is enough break; case METRICS_QUERY_CANCELING: - // it appears we're unly interested in the actual CANCELED event. + // it appears we're only interested in the actual CANCELED event. // for now we will ignore CANCELING state unless otherwise requested from // end users break; @@ -150,6 +153,12 @@ void EventSender::collect_query_submit(QueryDesc *query_desc) { // take initial metrics snapshot so that we can safely take diff afterwards // in END or DONE events. set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0); +#ifdef IC_TEARDOWN_HOOK + // same for interconnect statistics + ic_metrics_collect(); + set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(), + &ic_statistics); +#endif } } @@ -203,6 +212,12 @@ void EventSender::collect_query_done(QueryDesc *query_desc, set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, nested_calls, nested_timing); } +#ifdef IC_TEARDOWN_HOOK + ic_metrics_collect(); + set_ic_stats( + query_msg->mutable_query_metrics()->mutable_instrumentation(), + &ic_statistics); +#endif connector->report_query(*query_msg, msg); } update_nested_counters(query_desc); @@ -213,6 +228,39 @@ void EventSender::collect_query_done(QueryDesc *query_desc, } } +void EventSender::ic_metrics_collect() { +#ifdef IC_TEARDOWN_HOOK + if (Gp_interconnect_type != INTERCONNECT_TYPE_UDPIFC) { + return; + } + if (!connector || gp_command_count == 0 || !Config::enable_collector() || + Config::filter_user(get_user_name())) { + return; + } + // we also would like to know nesting level here and filter queries BUT we + // don't have this kind of information from this callback. Will have to + // collect stats anyways and throw it away later, if necessary + auto metrics = UDPIFCGetICStats(); + ic_statistics.totalRecvQueueSize += metrics.totalRecvQueueSize; + ic_statistics.recvQueueSizeCountingTime += metrics.recvQueueSizeCountingTime; + ic_statistics.totalCapacity += metrics.totalCapacity; + ic_statistics.capacityCountingTime += metrics.capacityCountingTime; + ic_statistics.totalBuffers += metrics.totalBuffers; + ic_statistics.bufferCountingTime += metrics.bufferCountingTime; + ic_statistics.activeConnectionsNum += metrics.activeConnectionsNum; + ic_statistics.retransmits += metrics.retransmits; + ic_statistics.startupCachedPktNum += metrics.startupCachedPktNum; + ic_statistics.mismatchNum += metrics.mismatchNum; + ic_statistics.crcErrors += metrics.crcErrors; + ic_statistics.sndPktNum += metrics.sndPktNum; + ic_statistics.recvPktNum += metrics.recvPktNum; + ic_statistics.disorderedPktNum += metrics.disorderedPktNum; + ic_statistics.duplicatedPktNum += metrics.duplicatedPktNum; + ic_statistics.recvAckNum += metrics.recvAckNum; + ic_statistics.statusQueryMsgNum += metrics.statusQueryMsgNum; +#endif +} + EventSender::EventSender() { if (Config::enable_collector() && !Config::filter_user(get_user_name())) { try { @@ -221,6 +269,9 @@ EventSender::EventSender() { ereport(INFO, (errmsg("Unable to start query tracing %s", e.what()))); } } +#ifdef IC_TEARDOWN_HOOK + memset(&ic_statistics, 0, sizeof(ICStatistics)); +#endif } EventSender::~EventSender() { diff --git a/src/EventSender.h b/src/EventSender.h index 9470cbf1f98..99f7b24753d 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -4,9 +4,15 @@ #include <unordered_map> #include <string> +#define typeid __typeid extern "C" { #include "utils/metrics_utils.h" +#include "cdb/ml_ipc.h" +#ifdef IC_TEARDOWN_HOOK +#include "cdb/ic_udpifc.h" +#endif } +#undef typeid class UDSConnector; struct QueryDesc; @@ -20,6 +26,7 @@ public: void executor_after_start(QueryDesc *query_desc, int eflags); void executor_end(QueryDesc *query_desc); void query_metrics_collect(QueryMetricsStatus status, void *arg); + void ic_metrics_collect(); void incr_depth() { nesting_level++; } void decr_depth() { nesting_level--; } EventSender(); @@ -55,5 +62,8 @@ private: int nesting_level = 0; int64_t nested_calls = 0; double nested_timing = 0; +#ifdef IC_TEARDOWN_HOOK + ICStatistics ic_statistics; +#endif std::unordered_map<std::pair<int, int>, QueryItem, pair_hash> query_msgs; }; \ No newline at end of file diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp index e1be25b8b1e..c37cefb72d6 100644 --- a/src/ProtoUtils.cpp +++ b/src/ProtoUtils.cpp @@ -10,6 +10,10 @@ extern "C" { #include "access/hash.h" #include "cdb/cdbinterconnect.h" #include "cdb/cdbvars.h" +#include "cdb/ml_ipc.h" +#ifdef IC_TEARDOWN_HOOK +#include "cdb/ic_udpifc.h" +#endif #include "gpmon/gpmon.h" #include "utils/workfile_mgr.h" @@ -171,6 +175,37 @@ void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc, WorkfileTotalBytesWritten() - metrics->mutable_spill()->totalbytes()); } +#define UPDATE_IC_STATS(proto_name, stat_name) \ + metrics->mutable_interconnect()->set_##proto_name( \ + ic_statistics->stat_name - \ + metrics->mutable_interconnect()->proto_name()); \ + Assert(metrics->mutable_interconnect()->proto_name() >= 0 && \ + metrics->mutable_interconnect()->proto_name() <= \ + ic_statistics->stat_name) + +void set_ic_stats(yagpcc::MetricInstrumentation *metrics, + const ICStatistics *ic_statistics) { +#ifdef IC_TEARDOWN_HOOK + UPDATE_IC_STATS(total_recv_queue_size, totalRecvQueueSize); + UPDATE_IC_STATS(recv_queue_size_counting_time, recvQueueSizeCountingTime); + UPDATE_IC_STATS(total_capacity, totalCapacity); + UPDATE_IC_STATS(capacity_counting_time, capacityCountingTime); + UPDATE_IC_STATS(total_buffers, totalBuffers); + UPDATE_IC_STATS(buffer_counting_time, bufferCountingTime); + UPDATE_IC_STATS(active_connections_num, activeConnectionsNum); + UPDATE_IC_STATS(retransmits, retransmits); + UPDATE_IC_STATS(startup_cached_pkt_num, startupCachedPktNum); + UPDATE_IC_STATS(mismatch_num, mismatchNum); + UPDATE_IC_STATS(crc_errors, crcErrors); + UPDATE_IC_STATS(snd_pkt_num, sndPktNum); + UPDATE_IC_STATS(recv_pkt_num, recvPktNum); + UPDATE_IC_STATS(disordered_pkt_num, disorderedPktNum); + UPDATE_IC_STATS(duplicated_pkt_num, duplicatedPktNum); + UPDATE_IC_STATS(recv_ack_num, recvAckNum); + UPDATE_IC_STATS(status_query_msg_num, statusQueryMsgNum); +#endif +} + yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status) { yagpcc::SetQueryReq req; req.set_query_status(status); diff --git a/src/ProtoUtils.h b/src/ProtoUtils.h index 38aa75611b2..4e4ed5e76a3 100644 --- a/src/ProtoUtils.h +++ b/src/ProtoUtils.h @@ -1,6 +1,7 @@ #include "protos/yagpcc_set_service.pb.h" struct QueryDesc; +struct ICStatistics; google::protobuf::Timestamp current_ts(); void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc); @@ -12,5 +13,7 @@ void set_qi_slice_id(yagpcc::SetQueryReq *req); void set_qi_error_message(yagpcc::SetQueryReq *req); void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc, int nested_calls, double nested_time); +void set_ic_stats(yagpcc::MetricInstrumentation *metrics, + const ICStatistics *ic_statistics); yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status); double protots_to_double(const google::protobuf::Timestamp &ts); \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index 93faaa0bf8f..f1d403b82f1 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -1,3 +1,4 @@ +#define typeid __typeid extern "C" { #include "postgres.h" #include "funcapi.h" @@ -7,8 +8,10 @@ extern "C" { #include "utils/metrics_utils.h" #include "cdb/cdbexplain.h" #include "cdb/cdbvars.h" +#include "cdb/ml_ipc.h" #include "tcop/utility.h" } +#undef typeid #include "Config.h" #include "YagpStat.h" @@ -21,6 +24,9 @@ static ExecutorRun_hook_type previous_ExecutorRun_hook = nullptr; static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr; static ExecutorEnd_hook_type previous_ExecutorEnd_hook = nullptr; static query_info_collect_hook_type previous_query_info_collect_hook = nullptr; +#ifdef IC_TEARDOWN_HOOK +static ic_teardown_hook_type previous_ic_teardown_hook = nullptr; +#endif static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags); static void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction, @@ -28,6 +34,8 @@ static void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction, static void ya_ExecutorFinish_hook(QueryDesc *query_desc); static void ya_ExecutorEnd_hook(QueryDesc *query_desc); static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg); +static void ya_ic_teardown_hook(ChunkTransportState *transportStates, + bool hasErrors); static EventSender *sender = nullptr; @@ -60,6 +68,10 @@ void hooks_init() { ExecutorEnd_hook = ya_ExecutorEnd_hook; previous_query_info_collect_hook = query_info_collect_hook; query_info_collect_hook = ya_query_info_collect_hook; +#ifdef IC_TEARDOWN_HOOK + previous_ic_teardown_hook = ic_teardown_hook; + ic_teardown_hook = ya_ic_teardown_hook; +#endif stat_statements_parser_init(); } @@ -69,6 +81,9 @@ void hooks_deinit() { ExecutorRun_hook = previous_ExecutorRun_hook; ExecutorFinish_hook = previous_ExecutorFinish_hook; query_info_collect_hook = previous_query_info_collect_hook; +#ifdef IC_TEARDOWN_HOOK + ic_teardown_hook = previous_ic_teardown_hook; +#endif stat_statements_parser_deinit(); if (sender) { delete sender; @@ -141,6 +156,15 @@ void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) { } } +void ya_ic_teardown_hook(ChunkTransportState *transportStates, bool hasErrors) { + cpp_call(get_sender(), &EventSender::ic_metrics_collect); +#ifdef IC_TEARDOWN_HOOK + if (previous_ic_teardown_hook) { + (*previous_ic_teardown_hook)(transportStates, hasErrors); + } +#endif +} + static void check_stats_loaded() { if (!YagpStat::loaded()) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
