This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit 02390adcd86f76fa157a76183ac7e343ea170294
Author: Maxim Smyatkin <[email protected]>
AuthorDate: Fri Apr 18 14:58:52 2025 +0300

    [yagp_hooks_collector] Add per-slice interconnect statistics
    
    Hook into ic_teardown to collect UDP-IFC packet-level counters.
    Compile-time gated behind IC_TEARDOWN_HOOK.
---
 protos/yagpcc_metrics.proto | 56 +++++++++++++++++++++++++++++++++++++++++++++
 src/EventSender.cpp         | 53 +++++++++++++++++++++++++++++++++++++++++-
 src/EventSender.h           | 10 ++++++++
 src/ProtoUtils.cpp          | 35 ++++++++++++++++++++++++++++
 src/ProtoUtils.h            |  3 +++
 src/hook_wrappers.cpp       | 24 +++++++++++++++++++
 6 files changed, 180 insertions(+), 1 deletion(-)

diff --git a/protos/yagpcc_metrics.proto b/protos/yagpcc_metrics.proto
index fc85386c6b0..086f3e63379 100644
--- a/protos/yagpcc_metrics.proto
+++ b/protos/yagpcc_metrics.proto
@@ -42,6 +42,11 @@ message AdditionalQueryInfo {
     int64 slice_id                        = 3;
 }
 
+message AdditionalQueryStat {
+    string error_message                  = 1;
+    repeated int64 slices                 = 2;
+}
+
 enum PlanGenerator
 {
     PLAN_GENERATOR_UNSPECIFIED = 0;
@@ -96,6 +101,56 @@ message NetworkStat {
     uint32 chunks = 3;
 }
 
+message InterconnectStat {
+    // Receive queue size sum when main thread is trying to get a packet
+    uint64 total_recv_queue_size = 1;
+    // Counting times when computing total_recv_queue_size
+    uint64 recv_queue_size_counting_time = 2;
+
+    // The capacity sum when packets are tried to be sent
+    uint64 total_capacity = 3;
+    // Counting times used to compute total_capacity
+    uint64 capacity_counting_time = 4;
+
+    // Total buffers available when sending packets
+    uint64 total_buffers = 5;
+    // Counting times when compute total_buffers
+    uint64 buffer_counting_time = 6;
+
+    // The number of active connections
+    uint64 active_connections_num = 7;
+
+    // The number of packet retransmits
+    int64 retransmits = 8;
+
+    // The number of cached future packets
+    int64 startup_cached_pkt_num = 9;
+
+    // The number of mismatched packets received
+    int64 mismatch_num = 10;
+
+    // The number of crc errors
+    int64 crc_errors = 11;
+
+    // The number of packets sent by sender
+    int64 snd_pkt_num = 12;
+
+    // The number of packets received by receiver
+    int64 recv_pkt_num = 13;
+
+    // Disordered packet number
+    int64 disordered_pkt_num = 14;
+
+    // Duplicate packet number
+    int64 duplicated_pkt_num = 15;
+
+    // The number of Acks received
+    int64 recv_ack_num = 16;
+
+    // The number of status query messages sent
+    int64 status_query_msg_num = 17;
+}
+
 message MetricInstrumentation {
     uint64  ntuples     = 1;    /* Total tuples produced */
     uint64  nloops      = 2;    /* # of run cycles for this node */
@@ -120,6 +175,7 @@ message MetricInstrumentation {
     double  startup_time       = 21; /* real query startup time (planning + 
queue time) */
     uint64  inherited_calls    = 22; /* the number of executed sub-queries */
     double  inherited_time     = 23; /* total time spend on inherited 
execution */
+    InterconnectStat interconnect = 24;
 }
 
 message SpillInfo {
diff --git a/src/EventSender.cpp b/src/EventSender.cpp
index cdb21ef7aa6..2ba34d1e4cc 100644
--- a/src/EventSender.cpp
+++ b/src/EventSender.cpp
@@ -1,6 +1,7 @@
 #include "Config.h"
 #include "UDSConnector.h"
 
+#define typeid __typeid
 extern "C" {
 #include "postgres.h"
 
@@ -11,7 +12,9 @@ extern "C" {
 #include "cdb/cdbdisp.h"
 #include "cdb/cdbexplain.h"
 #include "cdb/cdbvars.h"
+#include "cdb/ml_ipc.h"
 }
+#undef typeid
 
 #include "EventSender.h"
 #include "PgUtils.h"
@@ -35,7 +38,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus 
status, void *arg) {
     // no-op: executor_after_start is enough
     break;
   case METRICS_QUERY_CANCELING:
-    // it appears we're unly interested in the actual CANCELED event.
+    // it appears we're only interested in the actual CANCELED event.
     // for now we will ignore CANCELING state unless otherwise requested from
     // end users
     break;
@@ -150,6 +153,12 @@ void EventSender::collect_query_submit(QueryDesc 
*query_desc) {
     // take initial metrics snapshot so that we can safely take diff afterwards
     // in END or DONE events.
     set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0);
+#ifdef IC_TEARDOWN_HOOK
+    // same for interconnect statistics
+    ic_metrics_collect();
+    set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(),
+                 &ic_statistics);
+#endif
   }
 }
 
@@ -203,6 +212,12 @@ void EventSender::collect_query_done(QueryDesc *query_desc,
           set_gp_metrics(query_msg->mutable_query_metrics(), query_desc,
                          nested_calls, nested_timing);
         }
+#ifdef IC_TEARDOWN_HOOK
+        ic_metrics_collect();
+        set_ic_stats(
+            query_msg->mutable_query_metrics()->mutable_instrumentation(),
+            &ic_statistics);
+#endif
         connector->report_query(*query_msg, msg);
       }
       update_nested_counters(query_desc);
@@ -213,6 +228,39 @@ void EventSender::collect_query_done(QueryDesc *query_desc,
   }
 }
 
+void EventSender::ic_metrics_collect() {
+#ifdef IC_TEARDOWN_HOOK
+  if (Gp_interconnect_type != INTERCONNECT_TYPE_UDPIFC) {
+    return;
+  }
+  if (!connector || gp_command_count == 0 || !Config::enable_collector() ||
+      Config::filter_user(get_user_name())) {
+    return;
+  }
+  // we also would like to know nesting level here and filter queries BUT we
+  // don't have this kind of information from this callback. Will have to
+  // collect stats anyways and throw it away later, if necessary
+  auto metrics = UDPIFCGetICStats();
+  ic_statistics.totalRecvQueueSize += metrics.totalRecvQueueSize;
+  ic_statistics.recvQueueSizeCountingTime += metrics.recvQueueSizeCountingTime;
+  ic_statistics.totalCapacity += metrics.totalCapacity;
+  ic_statistics.capacityCountingTime += metrics.capacityCountingTime;
+  ic_statistics.totalBuffers += metrics.totalBuffers;
+  ic_statistics.bufferCountingTime += metrics.bufferCountingTime;
+  ic_statistics.activeConnectionsNum += metrics.activeConnectionsNum;
+  ic_statistics.retransmits += metrics.retransmits;
+  ic_statistics.startupCachedPktNum += metrics.startupCachedPktNum;
+  ic_statistics.mismatchNum += metrics.mismatchNum;
+  ic_statistics.crcErrors += metrics.crcErrors;
+  ic_statistics.sndPktNum += metrics.sndPktNum;
+  ic_statistics.recvPktNum += metrics.recvPktNum;
+  ic_statistics.disorderedPktNum += metrics.disorderedPktNum;
+  ic_statistics.duplicatedPktNum += metrics.duplicatedPktNum;
+  ic_statistics.recvAckNum += metrics.recvAckNum;
+  ic_statistics.statusQueryMsgNum += metrics.statusQueryMsgNum;
+#endif
+}
+
 EventSender::EventSender() {
   if (Config::enable_collector() && !Config::filter_user(get_user_name())) {
     try {
@@ -221,6 +269,9 @@ EventSender::EventSender() {
       ereport(INFO, (errmsg("Unable to start query tracing %s", e.what())));
     }
   }
+#ifdef IC_TEARDOWN_HOOK
+  memset(&ic_statistics, 0, sizeof(ICStatistics));
+#endif
 }
 
 EventSender::~EventSender() {
diff --git a/src/EventSender.h b/src/EventSender.h
index 9470cbf1f98..99f7b24753d 100644
--- a/src/EventSender.h
+++ b/src/EventSender.h
@@ -4,9 +4,15 @@
 #include <unordered_map>
 #include <string>
 
+#define typeid __typeid
 extern "C" {
 #include "utils/metrics_utils.h"
+#include "cdb/ml_ipc.h"
+#ifdef IC_TEARDOWN_HOOK
+#include "cdb/ic_udpifc.h"
+#endif
 }
+#undef typeid
 
 class UDSConnector;
 struct QueryDesc;
@@ -20,6 +26,7 @@ public:
   void executor_after_start(QueryDesc *query_desc, int eflags);
   void executor_end(QueryDesc *query_desc);
   void query_metrics_collect(QueryMetricsStatus status, void *arg);
+  void ic_metrics_collect();
   void incr_depth() { nesting_level++; }
   void decr_depth() { nesting_level--; }
   EventSender();
@@ -55,5 +62,8 @@ private:
   int nesting_level = 0;
   int64_t nested_calls = 0;
   double nested_timing = 0;
+#ifdef IC_TEARDOWN_HOOK
+  ICStatistics ic_statistics;
+#endif
   std::unordered_map<std::pair<int, int>, QueryItem, pair_hash> query_msgs;
 };
\ No newline at end of file
diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp
index e1be25b8b1e..c37cefb72d6 100644
--- a/src/ProtoUtils.cpp
+++ b/src/ProtoUtils.cpp
@@ -10,6 +10,10 @@ extern "C" {
 #include "access/hash.h"
 #include "cdb/cdbinterconnect.h"
 #include "cdb/cdbvars.h"
+#include "cdb/ml_ipc.h"
+#ifdef IC_TEARDOWN_HOOK
+#include "cdb/ic_udpifc.h"
+#endif
 #include "gpmon/gpmon.h"
 #include "utils/workfile_mgr.h"
 
@@ -171,6 +175,37 @@ void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc 
*query_desc,
       WorkfileTotalBytesWritten() - metrics->mutable_spill()->totalbytes());
 }
 
+#define UPDATE_IC_STATS(proto_name, stat_name)                                 
\
+  metrics->mutable_interconnect()->set_##proto_name(                           
\
+      ic_statistics->stat_name -                                               
\
+      metrics->mutable_interconnect()->proto_name());                          
\
+  Assert(metrics->mutable_interconnect()->proto_name() >= 0 &&                 
\
+         metrics->mutable_interconnect()->proto_name() <=                      
\
+             ic_statistics->stat_name)
+
+void set_ic_stats(yagpcc::MetricInstrumentation *metrics,
+                  const ICStatistics *ic_statistics) {
+#ifdef IC_TEARDOWN_HOOK
+  UPDATE_IC_STATS(total_recv_queue_size, totalRecvQueueSize);
+  UPDATE_IC_STATS(recv_queue_size_counting_time, recvQueueSizeCountingTime);
+  UPDATE_IC_STATS(total_capacity, totalCapacity);
+  UPDATE_IC_STATS(capacity_counting_time, capacityCountingTime);
+  UPDATE_IC_STATS(total_buffers, totalBuffers);
+  UPDATE_IC_STATS(buffer_counting_time, bufferCountingTime);
+  UPDATE_IC_STATS(active_connections_num, activeConnectionsNum);
+  UPDATE_IC_STATS(retransmits, retransmits);
+  UPDATE_IC_STATS(startup_cached_pkt_num, startupCachedPktNum);
+  UPDATE_IC_STATS(mismatch_num, mismatchNum);
+  UPDATE_IC_STATS(crc_errors, crcErrors);
+  UPDATE_IC_STATS(snd_pkt_num, sndPktNum);
+  UPDATE_IC_STATS(recv_pkt_num, recvPktNum);
+  UPDATE_IC_STATS(disordered_pkt_num, disorderedPktNum);
+  UPDATE_IC_STATS(duplicated_pkt_num, duplicatedPktNum);
+  UPDATE_IC_STATS(recv_ack_num, recvAckNum);
+  UPDATE_IC_STATS(status_query_msg_num, statusQueryMsgNum);
+#endif
+}
+
 yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status) {
   yagpcc::SetQueryReq req;
   req.set_query_status(status);
diff --git a/src/ProtoUtils.h b/src/ProtoUtils.h
index 38aa75611b2..4e4ed5e76a3 100644
--- a/src/ProtoUtils.h
+++ b/src/ProtoUtils.h
@@ -1,6 +1,7 @@
 #include "protos/yagpcc_set_service.pb.h"
 
 struct QueryDesc;
+struct ICStatistics;
 
 google::protobuf::Timestamp current_ts();
 void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc);
@@ -12,5 +13,7 @@ void set_qi_slice_id(yagpcc::SetQueryReq *req);
 void set_qi_error_message(yagpcc::SetQueryReq *req);
 void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc,
                     int nested_calls, double nested_time);
+void set_ic_stats(yagpcc::MetricInstrumentation *metrics,
+                  const ICStatistics *ic_statistics);
 yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status);
 double protots_to_double(const google::protobuf::Timestamp &ts);
\ No newline at end of file
diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp
index 93faaa0bf8f..f1d403b82f1 100644
--- a/src/hook_wrappers.cpp
+++ b/src/hook_wrappers.cpp
@@ -1,3 +1,4 @@
+#define typeid __typeid
 extern "C" {
 #include "postgres.h"
 #include "funcapi.h"
@@ -7,8 +8,10 @@ extern "C" {
 #include "utils/metrics_utils.h"
 #include "cdb/cdbexplain.h"
 #include "cdb/cdbvars.h"
+#include "cdb/ml_ipc.h"
 #include "tcop/utility.h"
 }
+#undef typeid
 
 #include "Config.h"
 #include "YagpStat.h"
@@ -21,6 +24,9 @@ static ExecutorRun_hook_type previous_ExecutorRun_hook = 
nullptr;
 static ExecutorFinish_hook_type previous_ExecutorFinish_hook = nullptr;
 static ExecutorEnd_hook_type previous_ExecutorEnd_hook = nullptr;
 static query_info_collect_hook_type previous_query_info_collect_hook = nullptr;
+#ifdef IC_TEARDOWN_HOOK
+static ic_teardown_hook_type previous_ic_teardown_hook = nullptr;
+#endif
 
 static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags);
 static void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction,
@@ -28,6 +34,8 @@ static void ya_ExecutorRun_hook(QueryDesc *query_desc, 
ScanDirection direction,
 static void ya_ExecutorFinish_hook(QueryDesc *query_desc);
 static void ya_ExecutorEnd_hook(QueryDesc *query_desc);
 static void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg);
+static void ya_ic_teardown_hook(ChunkTransportState *transportStates,
+                                bool hasErrors);
 
 static EventSender *sender = nullptr;
 
@@ -60,6 +68,10 @@ void hooks_init() {
   ExecutorEnd_hook = ya_ExecutorEnd_hook;
   previous_query_info_collect_hook = query_info_collect_hook;
   query_info_collect_hook = ya_query_info_collect_hook;
+#ifdef IC_TEARDOWN_HOOK
+  previous_ic_teardown_hook = ic_teardown_hook;
+  ic_teardown_hook = ya_ic_teardown_hook;
+#endif
   stat_statements_parser_init();
 }
 
@@ -69,6 +81,9 @@ void hooks_deinit() {
   ExecutorRun_hook = previous_ExecutorRun_hook;
   ExecutorFinish_hook = previous_ExecutorFinish_hook;
   query_info_collect_hook = previous_query_info_collect_hook;
+#ifdef IC_TEARDOWN_HOOK
+  ic_teardown_hook = previous_ic_teardown_hook;
+#endif
   stat_statements_parser_deinit();
   if (sender) {
     delete sender;
@@ -141,6 +156,15 @@ void ya_query_info_collect_hook(QueryMetricsStatus status, 
void *arg) {
   }
 }
 
+void ya_ic_teardown_hook(ChunkTransportState *transportStates, bool hasErrors) 
{
+  cpp_call(get_sender(), &EventSender::ic_metrics_collect);
+#ifdef IC_TEARDOWN_HOOK
+  if (previous_ic_teardown_hook) {
+    (*previous_ic_teardown_hook)(transportStates, hasErrors);
+  }
+#endif
+}
+
 static void check_stats_loaded() {
   if (!YagpStat::loaded()) {
     ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to