This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch migrate-metrics-dev
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 31c05f48218f391bca657223231c96e31e12579b
Author: Dan Wang <[email protected]>
AuthorDate: Mon Jun 12 11:24:43 2023 +0800

    feat(new_metrics): migrate metrics for profiler (#1524)
    
    https://github.com/apache/incubator-pegasus/issues/1523
    
    Profiler-level metric entity is introduced and 12 profiler-related metrics 
are
    migrate to the new framework, including the number of tasks in all queues,
    the number of tasks that have been executed, the number of cancelled tasks,
    the latency it takes for each task to wait in each queue before beginning 
to be
    executed, the latency it takes for each task to be executed, the latency 
from
    enqueue point to reply point on the server side for each RPC task, the 
non-timeout
    latency from call point to enqueue point on the client side for each RPC 
task,
    the body length of request received or response replied on the server side 
for
    each RPC task, the accumulative number of dropped RPC tasks on the server
    side, the accumulative number of timeout RPC tasks on the client side.
    
    All these metrics are configurable thus might be NULL. 
`METRIC_DEFINE_*_NOTNULL`
    macros are introduced to help defining the member functions that check if 
the
    metric variables are NULL pointers. Some macros are refactored to make it 
more
    convenient to define the member functions that increment/decrement/set/get 
the
    value of the metric variables.
---
 src/common/fs_manager.h       |   3 +-
 src/runtime/profiler.cpp      | 478 +++++++++++++++++++++---------------------
 src/runtime/profiler_header.h |  68 +++---
 src/utils/metrics.h           |  84 ++++++--
 4 files changed, 352 insertions(+), 281 deletions(-)

diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h
index 27c8cc766..c0c461c57 100644
--- a/src/common/fs_manager.h
+++ b/src/common/fs_manager.h
@@ -29,11 +29,10 @@
 #include "common/replication_other_types.h"
 #include "metadata_types.h"
 #include "utils/autoref_ptr.h"
-#include "utils/error_code.h"
 #include "utils/flags.h"
-#include "utils/string_view.h"
 #include "utils/metrics.h"
 #include "utils/ports.h"
+#include "utils/string_view.h"
 #include "utils/zlocks.h"
 
 namespace dsn {
diff --git a/src/runtime/profiler.cpp b/src/runtime/profiler.cpp
index 4ea1694c2..b506d11ea 100644
--- a/src/runtime/profiler.cpp
+++ b/src/runtime/profiler.cpp
@@ -49,16 +49,16 @@ START<== queue(server) == ENQUEUE <===== net(reply) ======= 
REPLY <=============
 */
 #include "runtime/profiler.h"
 
-#include <stddef.h>
 #include <algorithm>
 #include <atomic>
 #include <cstdint>
 #include <memory>
+#include <set>
 #include <string>
+#include <vector>
 
 #include "aio/aio_task.h"
-#include "perf_counter/perf_counter.h"
-#include "perf_counter/perf_counter_wrapper.h"
+#include "fmt/core.h"
 #include "profiler_header.h"
 #include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_message.h"
@@ -67,9 +67,80 @@ START<== queue(server) == ENQUEUE <===== net(reply) ======= 
REPLY <=============
 #include "runtime/task/task_spec.h"
 #include "utils/config_api.h"
 #include "utils/extensible_object.h"
-#include "utils/fmt_logging.h"
 #include "utils/flags.h"
+#include "utils/fmt_logging.h"
 #include "utils/join_point.h"
+#include "utils/metrics.h"
+#include "utils/string_view.h"
+
+METRIC_DEFINE_entity(profiler);
+
+METRIC_DEFINE_gauge_int64(profiler,
+                          profiler_queued_tasks,
+                          dsn::metric_unit::kTasks,
+                          "The number of tasks in all queues");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_queue_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency it takes for each task to wait in 
each queue "
+                               "before beginning to be executed");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_execute_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency it takes for each task to be 
executed");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_executed_tasks,
+                      dsn::metric_unit::kTasks,
+                      "The number of tasks that have been executed");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_cancelled_tasks,
+                      dsn::metric_unit::kTasks,
+                      "The number of cancelled tasks");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_server_rpc_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The latency from enqueue point to reply point 
on the server side "
+                               "for each RPC task");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_server_rpc_request_bytes,
+                               dsn::metric_unit::kBytes,
+                               "The body length of request received on the 
server side for each "
+                               "RPC task");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_server_rpc_response_bytes,
+                               dsn::metric_unit::kBytes,
+                               "The body length of response replied on the 
server side for each "
+                               "RPC task");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_dropped_timeout_rpcs,
+                      dsn::metric_unit::kTasks,
+                      "The accumulative number of dropped RPC tasks on the 
server side "
+                      "due to timeout");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_client_rpc_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The non-timeout latency from call point to 
enqueue point on "
+                               "the client side for each RPC task");
+
+METRIC_DEFINE_counter(profiler,
+                      profiler_client_timeout_rpcs,
+                      dsn::metric_unit::kTasks,
+                      "The accumulative number of timeout RPC tasks on the 
client side");
+
+METRIC_DEFINE_percentile_int64(profiler,
+                               profiler_aio_latency_ns,
+                               dsn::metric_unit::kNanoSeconds,
+                               "The duration of the whole AIO operation (begin 
to aio -> "
+                               "executing -> finished -> callback is put into 
queue)");
 
 namespace dsn {
 struct service_spec;
@@ -86,7 +157,7 @@ DSN_DEFINE_bool(task..default,
 typedef uint64_extension_helper<task_spec_profiler, task> 
task_ext_for_profiler;
 typedef uint64_extension_helper<task_spec_profiler, message_ex> 
message_ext_for_profiler;
 
-std::unique_ptr<task_spec_profiler[]> s_spec_profilers;
+std::vector<task_spec_profiler> s_spec_profilers;
 
 int s_task_code_max = 0;
 
@@ -113,9 +184,7 @@ static void profiler_on_task_enqueue(task *caller, task 
*callee)
 
     task_ext_for_profiler::get(callee) = dsn_now_ns();
     if (callee->delay_milliseconds() == 0) {
-        auto ptr = s_spec_profilers[callee_code].ptr[TASK_IN_QUEUE].get();
-        if (ptr != nullptr)
-            ptr->increment();
+        METRIC_INCREMENT(s_spec_profilers[callee_code], profiler_queued_tasks);
     }
 }
 
@@ -127,14 +196,10 @@ static void profiler_on_task_begin(task *this_)
 
     uint64_t &qts = task_ext_for_profiler::get(this_);
     uint64_t now = dsn_now_ns();
-    auto ptr = s_spec_profilers[code].ptr[TASK_QUEUEING_TIME_NS].get();
-    if (ptr != nullptr)
-        ptr->set(now - qts);
+    METRIC_SET(s_spec_profilers[code], profiler_queue_latency_ns, now - qts);
     qts = now;
 
-    ptr = s_spec_profilers[code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr)
-        ptr->decrement();
+    METRIC_DECREMENT(s_spec_profilers[code], profiler_queued_tasks);
 }
 
 static void profiler_on_task_end(task *this_)
@@ -144,13 +209,9 @@ static void profiler_on_task_end(task *this_)
 
     uint64_t qts = task_ext_for_profiler::get(this_);
     uint64_t now = dsn_now_ns();
-    auto ptr = s_spec_profilers[code].ptr[TASK_EXEC_TIME_NS].get();
-    if (ptr != nullptr)
-        ptr->set(now - qts);
+    METRIC_SET(s_spec_profilers[code], profiler_execute_latency_ns, now - qts);
 
-    ptr = s_spec_profilers[code].ptr[TASK_THROUGHPUT].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_executed_tasks);
 }
 
 static void profiler_on_task_cancelled(task *this_)
@@ -158,9 +219,7 @@ static void profiler_on_task_cancelled(task *this_)
     auto code = this_->spec().code;
     CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
 
-    auto ptr = s_spec_profilers[code].ptr[TASK_CANCELLED].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_cancelled_tasks);
 }
 
 static void profiler_on_task_wait_pre(task *caller, task *callee, uint32_t 
timeout_ms) {}
@@ -198,14 +257,10 @@ static void profiler_on_aio_enqueue(aio_task *this_)
     uint64_t &ats = task_ext_for_profiler::get(this_);
     uint64_t now = dsn_now_ns();
 
-    auto ptr = s_spec_profilers[code].ptr[AIO_LATENCY_NS].get();
-    if (ptr != nullptr)
-        ptr->set(now - ats);
+    METRIC_SET(s_spec_profilers[code], profiler_aio_latency_ns, now - ats);
     ats = now;
 
-    ptr = s_spec_profilers[code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_queued_tasks);
 }
 
 // return true means continue, otherwise early terminate with 
task::set_error_code
@@ -239,23 +294,17 @@ static void 
profiler_on_rpc_request_enqueue(rpc_request_task *callee)
     task_ext_for_profiler::get(callee) = now;
     message_ext_for_profiler::get(callee->get_request()) = now;
 
-    auto ptr = s_spec_profilers[callee_code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr) {
-        ptr->increment();
-    }
-    ptr = 
s_spec_profilers[callee_code].ptr[RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES].get();
-    if (ptr != nullptr) {
-        ptr->set(callee->get_request()->header->body_length);
-    }
+    METRIC_INCREMENT(s_spec_profilers[callee_code], profiler_queued_tasks);
+
+    METRIC_SET(s_spec_profilers[callee_code],
+               profiler_server_rpc_request_bytes,
+               callee->get_request()->header->body_length);
 }
 
 static void profile_on_rpc_task_dropped(rpc_request_task *callee)
 {
     auto code = callee->spec().code;
-    auto ptr = s_spec_profilers[code].ptr[RPC_DROPPED_IF_TIMEOUT].get();
-    if (ptr != nullptr) {
-        ptr->increment();
-    }
+    METRIC_INCREMENT(s_spec_profilers[code], profiler_dropped_timeout_rpcs);
 }
 
 static void profiler_on_rpc_create_response(message_ex *req, message_ex *resp)
@@ -283,14 +332,11 @@ static void profiler_on_rpc_reply(task *caller, 
message_ex *msg)
     CHECK_NOTNULL(spec, "task_spec cannot be null, code = {}", 
msg->local_rpc_code.code());
     auto code = spec->rpc_paired_code;
     CHECK(code >= 0 && code <= s_task_code_max, "code = {}", code.code());
-    auto ptr = s_spec_profilers[code].ptr[RPC_SERVER_LATENCY_NS].get();
-    if (ptr != nullptr) {
-        ptr->set(now - qts);
-    }
-    ptr = 
s_spec_profilers[code].ptr[RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES].get();
-    if (ptr != nullptr) {
-        ptr->set(msg->header->body_length);
-    }
+
+    METRIC_SET(s_spec_profilers[code], profiler_server_rpc_latency_ns, now - 
qts);
+
+    METRIC_SET(
+        s_spec_profilers[code], profiler_server_rpc_response_bytes, 
msg->header->body_length);
 }
 
 static void profiler_on_rpc_response_enqueue(rpc_response_task *resp)
@@ -302,220 +348,176 @@ static void 
profiler_on_rpc_response_enqueue(rpc_response_task *resp)
     uint64_t now = dsn_now_ns();
 
     if (resp->get_response() != nullptr) {
-        auto ptr = 
s_spec_profilers[resp_code].ptr[RPC_CLIENT_NON_TIMEOUT_LATENCY_NS].get();
-        if (ptr != nullptr)
-            ptr->set(now - cts);
+        METRIC_SET(s_spec_profilers[resp_code], 
profiler_client_rpc_latency_ns, now - cts);
     } else {
-        auto ptr = 
s_spec_profilers[resp_code].ptr[RPC_CLIENT_TIMEOUT_THROUGHPUT].get();
-        if (ptr != nullptr)
-            ptr->increment();
+        METRIC_INCREMENT(s_spec_profilers[resp_code], 
profiler_client_timeout_rpcs);
     }
     cts = now;
 
-    auto ptr = s_spec_profilers[resp_code].ptr[TASK_IN_QUEUE].get();
-    if (ptr != nullptr)
-        ptr->increment();
+    METRIC_INCREMENT(s_spec_profilers[resp_code], profiler_queued_tasks);
 }
 
-void profiler::install(service_spec &)
+namespace {
+
+metric_entity_ptr instantiate_profiler_metric_entity(const std::string 
&task_name)
 {
-    s_task_code_max = dsn::task_code::max();
-    s_spec_profilers.reset(new task_spec_profiler[s_task_code_max + 1]);
-    task_ext_for_profiler::register_ext();
-    message_ext_for_profiler::register_ext();
+    auto entity_id = fmt::format("task_{}", task_name);
 
-    for (int i = 0; i <= s_task_code_max; i++) {
-        if (i == TASK_CODE_INVALID)
-            continue;
+    return METRIC_ENTITY_profiler.instantiate(entity_id, {{"task_name", 
task_name}});
+}
+
+} // anonymous namespace
+
+task_spec_profiler::task_spec_profiler(int code)
+    : collect_call_count(false),
+      is_profile(false),
+      call_counts(new std::atomic<int64_t>[ s_task_code_max + 1 ]),
+      _task_name(dsn::task_code(code).to_string()),
+      _profiler_metric_entity(instantiate_profiler_metric_entity(_task_name))
+{
+    const auto &section_name = fmt::format("task.{}", _task_name);
+    auto spec = task_spec::get(code);
+    CHECK_NOTNULL(spec, "spec should be non-null: task_code={}, task_name={}", 
code, _task_name);
+
+    collect_call_count = dsn_config_get_value_bool(
+        section_name.c_str(),
+        "collect_call_count",
+        FLAGS_collect_call_count,
+        "whether to collect how many time this kind of tasks invoke each of 
other kinds tasks");
+
+    for (int i = 0; i <= s_task_code_max; ++i) {
+        call_counts[i].store(0);
+    }
+
+    is_profile = dsn_config_get_value_bool(section_name.c_str(),
+                                           "is_profile",
+                                           FLAGS_is_profile,
+                                           "whether to profile this kind of 
task");
 
-        std::string name(dsn::task_code(i).to_string());
-        std::string section_name = std::string("task.") + name;
-        task_spec *spec = task_spec::get(i);
-        CHECK_NOTNULL(spec, "");
+    if (!is_profile) {
+        return;
+    }
 
-        s_spec_profilers[i].collect_call_count = dsn_config_get_value_bool(
+    if (dsn_config_get_value_bool(
             section_name.c_str(),
-            "collect_call_count",
-            FLAGS_collect_call_count,
-            "whether to collect how many time this kind of tasks invoke each 
of other kinds tasks");
-        s_spec_profilers[i].call_counts = new std::atomic<int64_t>[ 
s_task_code_max + 1 ];
-        std::fill(s_spec_profilers[i].call_counts,
-                  s_spec_profilers[i].call_counts + s_task_code_max + 1,
-                  0);
-
-        s_spec_profilers[i].is_profile =
-            dsn_config_get_value_bool(section_name.c_str(),
-                                      "is_profile",
-                                      FLAGS_is_profile,
-                                      "whether to profile this kind of task");
-
-        if (!s_spec_profilers[i].is_profile)
-            continue;
+            "profiler::inqueue",
+            true,
+            "whether to profile the number of this kind of tasks in all 
queues")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_queued_tasks);
+    }
 
-        if (dsn_config_get_value_bool(
-                section_name.c_str(),
-                "profiler::inqueue",
-                true,
-                "whether to profile the number of this kind of tasks in all 
queues"))
-            s_spec_profilers[i].ptr[TASK_IN_QUEUE].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".inqueue")).c_str(),
-                COUNTER_TYPE_NUMBER,
-                "task number in all queues");
+    if (dsn_config_get_value_bool(section_name.c_str(),
+                                  "profiler::queue",
+                                  true,
+                                  "whether to profile the queuing time of a 
task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_queue_latency_ns);
+    }
+
+    if (dsn_config_get_value_bool(section_name.c_str(),
+                                  "profiler::exec",
+                                  true,
+                                  "whether to profile the executing time of a 
task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_execute_latency_ns);
+    }
 
+    if (dsn_config_get_value_bool(
+            section_name.c_str(), "profiler::qps", true, "whether to profile 
the qps of a task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_executed_tasks);
+    }
+
+    if (dsn_config_get_value_bool(section_name.c_str(),
+                                  "profiler::cancelled",
+                                  true,
+                                  "whether to profile the cancelled times of a 
task")) {
+        METRIC_VAR_ASSIGN_profiler(profiler_cancelled_tasks);
+    }
+
+    if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_REQUEST) {
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::queue",
+                                      "profiler::latency.server",
                                       true,
-                                      "whether to profile the queuing time of 
a task"))
-            s_spec_profilers[i].ptr[TASK_QUEUEING_TIME_NS].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".queue(ns)")).c_str(),
-                COUNTER_TYPE_NUMBER_PERCENTILES,
-                "latency due to waiting in the queue");
-
+                                      "whether to profile the server latency 
of a task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_server_rpc_latency_ns);
+        }
+        if (dsn_config_get_value_bool(section_name.c_str(),
+                                      "profiler::size.request.server",
+                                      false,
+                                      "whether to profile the size per 
request")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_server_rpc_request_bytes);
+        }
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::exec",
+                                      "profiler::size.response.server",
+                                      false,
+                                      "whether to profile the size per 
response")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_server_rpc_response_bytes);
+        }
+        if (dsn_config_get_value_bool(section_name.c_str(),
+                                      
"rpc_request_dropped_before_execution_when_timeout",
+                                      false,
+                                      "whether to profile the number of rpc 
dropped for timeout")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_dropped_timeout_rpcs);
+        }
+    } else if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_RESPONSE) {
+        if (dsn_config_get_value_bool(section_name.c_str(),
+                                      "profiler::latency.client",
                                       true,
-                                      "whether to profile the executing time 
of a task"))
-            s_spec_profilers[i].ptr[TASK_EXEC_TIME_NS].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".exec(ns)")).c_str(),
-                COUNTER_TYPE_NUMBER_PERCENTILES,
-                "latency due to executing tasks");
-
+                                      "whether to profile the client latency 
of a task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_client_rpc_latency_ns);
+        }
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::qps",
+                                      "profiler::timeout.qps",
                                       true,
-                                      "whether to profile the qps of a task"))
-            s_spec_profilers[i].ptr[TASK_THROUGHPUT].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".qps")).c_str(),
-                COUNTER_TYPE_RATE,
-                "task numbers per second");
-
+                                      "whether to profile the timeout qps of a 
task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_client_timeout_rpcs);
+        }
+    } else if (spec->type == dsn_task_type_t::TASK_TYPE_AIO) {
         if (dsn_config_get_value_bool(section_name.c_str(),
-                                      "profiler::cancelled",
+                                      "profiler::latency",
                                       true,
-                                      "whether to profile the cancelled times 
of a task"))
-            s_spec_profilers[i].ptr[TASK_CANCELLED].init_global_counter(
-                "zion",
-                "profiler",
-                (name + std::string(".cancelled")).c_str(),
-                COUNTER_TYPE_NUMBER,
-                "cancelled times of a specific task type");
-
-        if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_REQUEST) {
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::latency.server",
-                                          true,
-                                          "whether to profile the server 
latency of a task")) {
-                
s_spec_profilers[i].ptr[RPC_SERVER_LATENCY_NS].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".latency.server")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "latency from enqueue point to reply point on the server 
side for RPC "
-                    "tasks");
-            }
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::size.request.server",
-                                          false,
-                                          "whether to profile the size per 
request")) {
-                
s_spec_profilers[i].ptr[RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".size.request.server")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "");
-            }
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::size.response.server",
-                                          false,
-                                          "whether to profile the size per 
response")) {
-                
s_spec_profilers[i].ptr[RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".size.response.server")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "");
-            }
-            if (dsn_config_get_value_bool(
-                    section_name.c_str(),
-                    "rpc_request_dropped_before_execution_when_timeout",
-                    false,
-                    "whether to profile the number of rpc dropped for 
timeout"))
-                
s_spec_profilers[i].ptr[RPC_DROPPED_IF_TIMEOUT].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".rpc.dropped")).c_str(),
-                    COUNTER_TYPE_VOLATILE_NUMBER,
-                    "rpc dropped if queue time exceed client timeout");
-        } else if (spec->type == dsn_task_type_t::TASK_TYPE_RPC_RESPONSE) {
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::latency.client",
-                                          true,
-                                          "whether to profile the client 
latency of a task"))
-                
s_spec_profilers[i].ptr[RPC_CLIENT_NON_TIMEOUT_LATENCY_NS].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".latency.client(ns)")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "latency from call point to enqueue point on the client 
side for RPC "
-                    "tasks");
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::timeout.qps",
-                                          true,
-                                          "whether to profile the timeout qps 
of a task"))
-                
s_spec_profilers[i].ptr[RPC_CLIENT_TIMEOUT_THROUGHPUT].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".timeout.qps")).c_str(),
-                    COUNTER_TYPE_RATE,
-                    "time-out task numbers per second for RPC tasks");
-        } else if (spec->type == dsn_task_type_t::TASK_TYPE_AIO) {
-            if (dsn_config_get_value_bool(section_name.c_str(),
-                                          "profiler::latency",
-                                          true,
-                                          "whether to profile the latency of 
an AIO task"))
-                s_spec_profilers[i].ptr[AIO_LATENCY_NS].init_global_counter(
-                    "zion",
-                    "profiler",
-                    (name + std::string(".latency(ns)")).c_str(),
-                    COUNTER_TYPE_NUMBER_PERCENTILES,
-                    "latency from call point to enqueue point for AIO tasks");
+                                      "whether to profile the latency of an 
AIO task")) {
+            METRIC_VAR_ASSIGN_profiler(profiler_aio_latency_ns);
         }
+    }
+
+    spec->on_task_create.put_back(profiler_on_task_create, "profiler");
+    spec->on_task_enqueue.put_back(profiler_on_task_enqueue, "profiler");
+    spec->on_task_begin.put_back(profiler_on_task_begin, "profiler");
+    spec->on_task_end.put_back(profiler_on_task_end, "profiler");
+    spec->on_task_cancelled.put_back(profiler_on_task_cancelled, "profiler");
+    spec->on_task_wait_pre.put_back(profiler_on_task_wait_pre, "profiler");
+    spec->on_task_wait_post.put_back(profiler_on_task_wait_post, "profiler");
+    spec->on_task_cancel_post.put_back(profiler_on_task_cancel_post, 
"profiler");
+    spec->on_aio_call.put_back(profiler_on_aio_call, "profiler");
+    spec->on_aio_enqueue.put_back(profiler_on_aio_enqueue, "profiler");
+    spec->on_rpc_call.put_back(profiler_on_rpc_call, "profiler");
+    spec->on_rpc_request_enqueue.put_back(profiler_on_rpc_request_enqueue, 
"profiler");
+    spec->on_rpc_task_dropped.put_back(profile_on_rpc_task_dropped, 
"profiler");
+    spec->on_rpc_create_response.put_back(profiler_on_rpc_create_response, 
"profiler");
+    spec->on_rpc_reply.put_back(profiler_on_rpc_reply, "profiler");
+    spec->on_rpc_response_enqueue.put_back(profiler_on_rpc_response_enqueue, 
"profiler");
+}
+
+const metric_entity_ptr &task_spec_profiler::profiler_metric_entity() const
+{
+    CHECK_NOTNULL(_profiler_metric_entity,
+                  "profiler metric entity (task_name={}) should has been 
instantiated: "
+                  "uninitialized entity cannot be used to instantiate metric",
+                  _task_name);
+    return _profiler_metric_entity;
+}
 
-        // we don't use perf_counter_ptr but perf_counter* in ptr[xxx] to 
avoid unnecessary memory
-        // access cost
-        // we need to add reference so that the counters won't go
-        // release_ref should be done when the profiler exits (which never 
happens right now so we
-        // omit that for the time being)
-        for (size_t j = 0; j < sizeof(s_spec_profilers[i].ptr) / 
sizeof(perf_counter *); j++) {
-            if (s_spec_profilers[i].ptr[j].get() != nullptr) {
-                s_spec_profilers[i].ptr[j]->add_ref();
-            }
+void profiler::install(service_spec &)
+{
+    s_task_code_max = dsn::task_code::max();
+    task_ext_for_profiler::register_ext();
+    message_ext_for_profiler::register_ext();
+
+    for (int code = 0; code <= s_task_code_max; ++code) {
+        if (code == TASK_CODE_INVALID) {
+            continue;
         }
 
-        spec->on_task_create.put_back(profiler_on_task_create, "profiler");
-        spec->on_task_enqueue.put_back(profiler_on_task_enqueue, "profiler");
-        spec->on_task_begin.put_back(profiler_on_task_begin, "profiler");
-        spec->on_task_end.put_back(profiler_on_task_end, "profiler");
-        spec->on_task_cancelled.put_back(profiler_on_task_cancelled, 
"profiler");
-        spec->on_task_wait_pre.put_back(profiler_on_task_wait_pre, "profiler");
-        spec->on_task_wait_post.put_back(profiler_on_task_wait_post, 
"profiler");
-        spec->on_task_cancel_post.put_back(profiler_on_task_cancel_post, 
"profiler");
-        spec->on_aio_call.put_back(profiler_on_aio_call, "profiler");
-        spec->on_aio_enqueue.put_back(profiler_on_aio_enqueue, "profiler");
-        spec->on_rpc_call.put_back(profiler_on_rpc_call, "profiler");
-        spec->on_rpc_request_enqueue.put_back(profiler_on_rpc_request_enqueue, 
"profiler");
-        spec->on_rpc_task_dropped.put_back(profile_on_rpc_task_dropped, 
"profiler");
-        spec->on_rpc_create_response.put_back(profiler_on_rpc_create_response, 
"profiler");
-        spec->on_rpc_reply.put_back(profiler_on_rpc_reply, "profiler");
-        
spec->on_rpc_response_enqueue.put_back(profiler_on_rpc_response_enqueue, 
"profiler");
+        s_spec_profilers.emplace_back(code);
     }
 }
 
diff --git a/src/runtime/profiler_header.h b/src/runtime/profiler_header.h
index 54b0df335..a32935804 100644
--- a/src/runtime/profiler_header.h
+++ b/src/runtime/profiler_header.h
@@ -25,45 +25,55 @@
  */
 
 #pragma once
+
 #include <iomanip>
-#include "perf_counter/perf_counter_wrapper.h"
+
+#include "utils/autoref_ptr.h"
+#include "utils/metrics.h"
 
 namespace dsn {
 namespace tools {
 
-enum perf_counter_ptr_type
-{
-    TASK_QUEUEING_TIME_NS,
-    TASK_EXEC_TIME_NS,
-    TASK_THROUGHPUT,
-    TASK_CANCELLED,
-    AIO_LATENCY_NS,
-    RPC_SERVER_LATENCY_NS,
-    RPC_SERVER_SIZE_PER_REQUEST_IN_BYTES,
-    RPC_SERVER_SIZE_PER_RESPONSE_IN_BYTES,
-    RPC_CLIENT_NON_TIMEOUT_LATENCY_NS,
-    RPC_CLIENT_TIMEOUT_THROUGHPUT,
-    TASK_IN_QUEUE,
-    RPC_DROPPED_IF_TIMEOUT,
-
-    PERF_COUNTER_COUNT,
-    PERF_COUNTER_INVALID
-};
-
 struct task_spec_profiler
 {
-    perf_counter_wrapper ptr[PERF_COUNTER_COUNT];
     bool collect_call_count;
     bool is_profile;
-    std::atomic<int64_t> *call_counts;
+    std::unique_ptr<std::atomic<int64_t>[]> call_counts;
+
+    task_spec_profiler(int code);
+    const metric_entity_ptr &profiler_metric_entity() const;
+
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_queued_tasks)
+    METRIC_DEFINE_DECREMENT_NOTNULL(profiler_queued_tasks)
+    METRIC_DEFINE_SET_NOTNULL(profiler_queue_latency_ns, int64_t)
+    METRIC_DEFINE_SET_NOTNULL(profiler_execute_latency_ns, int64_t)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_executed_tasks)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_cancelled_tasks)
+    METRIC_DEFINE_SET_NOTNULL(profiler_server_rpc_latency_ns, int64_t)
+    METRIC_DEFINE_SET_NOTNULL(profiler_server_rpc_request_bytes, int64_t)
+    METRIC_DEFINE_SET_NOTNULL(profiler_server_rpc_response_bytes, int64_t)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_dropped_timeout_rpcs)
+    METRIC_DEFINE_SET_NOTNULL(profiler_client_rpc_latency_ns, int64_t)
+    METRIC_DEFINE_INCREMENT_NOTNULL(profiler_client_timeout_rpcs)
+    METRIC_DEFINE_SET_NOTNULL(profiler_aio_latency_ns, int64_t)
 
-    task_spec_profiler()
-    {
-        collect_call_count = false;
-        is_profile = false;
-        call_counts = nullptr;
-        memset((void *)ptr, 0, sizeof(ptr));
-    }
+private:
+    const std::string _task_name;
+    const metric_entity_ptr _profiler_metric_entity;
+
+    METRIC_VAR_DECLARE_gauge_int64(profiler_queued_tasks);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_queue_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_execute_latency_ns);
+    METRIC_VAR_DECLARE_counter(profiler_executed_tasks);
+    METRIC_VAR_DECLARE_counter(profiler_cancelled_tasks);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_server_rpc_latency_ns);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_server_rpc_request_bytes);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_server_rpc_response_bytes);
+    METRIC_VAR_DECLARE_counter(profiler_dropped_timeout_rpcs);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_client_rpc_latency_ns);
+    METRIC_VAR_DECLARE_counter(profiler_client_timeout_rpcs);
+    METRIC_VAR_DECLARE_percentile_int64(profiler_aio_latency_ns);
 };
+
 } // namespace tools
 } // namespace dsn
diff --git a/src/utils/metrics.h b/src/utils/metrics.h
index 2ecc4d305..e484e4c1d 100644
--- a/src/utils/metrics.h
+++ b/src/utils/metrics.h
@@ -192,6 +192,7 @@ class error_code;
 #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, 
##__VA_ARGS__)
 #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, 
backup_policy, ##__VA_ARGS__)
 #define METRIC_VAR_INIT_queue(name, ...) METRIC_VAR_INIT(name, queue, 
##__VA_ARGS__)
+#define METRIC_VAR_ASSIGN_profiler(name, ...) METRIC_VAR_ASSIGN(name, 
profiler, ##__VA_ARGS__)
 
 // Perform increment_by() operations on gauges and counters.
 #define METRIC_VAR_INCREMENT_BY(name, x)                                       
                    \
@@ -238,37 +239,96 @@ class error_code;
 #define METRIC_VAR_AUTO_COUNT(name, ...)                                       
                    \
     dsn::auto_count __##name##_auto_count(METRIC_VAR_NAME(name), ##__VA_ARGS__)
 
+// Implement a member function that runs `method` on the metric variable, 
without any argument.
+#define METRIC_DEFINE_NO_ARG(method, name)                                     
                    \
+    void METRIC_FUNC_NAME_##method(name)() { METRIC_VAR_##method(name); }
+
+// Implement a member function that runs `method` on the metric variable if 
NOT NULL,
+// without any argument.
+#define METRIC_DEFINE_NO_ARG_NOTNULL(method, name)                             
                    \
+    void METRIC_FUNC_NAME_##method(name)()                                     
                    \
+    {                                                                          
                    \
+        if (METRIC_VAR_NAME(name) != nullptr) {                                
                    \
+            METRIC_VAR_##method(name);                                         
                    \
+        }                                                                      
                    \
+    }
+
+// Implement a member function that runs `method` on the metric variable and 
return `ret_type`,
+// without any argument.
+#define METRIC_DEFINE_RET_AND_NO_ARG(ret_type, method, name)                   
                    \
+    ret_type METRIC_FUNC_NAME_##method(name)() { return 
METRIC_VAR_##method(name); }
+
+// Implement a member function that runs `method` on the metric variable, with 
an argument.
+#define METRIC_DEFINE_ONE_ARG(method, name, arg_type)                          
                    \
+    void METRIC_FUNC_NAME_##method(name)(arg_type arg) { 
METRIC_VAR_##method(name, arg); }
+
+// Implement a member function that runs `method` on the metric variable if 
NOT NULL,
+// with an argument.
+#define METRIC_DEFINE_ONE_ARG_NOTNULL(method, name, arg_type)                  
                    \
+    void METRIC_FUNC_NAME_##method(name)(arg_type arg)                         
                    \
+    {                                                                          
                    \
+        if (METRIC_VAR_NAME(name) != nullptr) {                                
                    \
+            METRIC_VAR_##method(name, arg);                                    
                    \
+        }                                                                      
                    \
+    }
+
+// Call the member function of `obj` to run `method` on the metric variable.
+#define METRIC_CALL(obj, method, name, ...) 
(obj).METRIC_FUNC_NAME_##method(name)(__VA_ARGS__)
+
+// The name of the member function that increments the metric variable by some 
value.
 #define METRIC_FUNC_NAME_INCREMENT_BY(name) increment_##name##_by
 
-#define METRIC_DEFINE_INCREMENT_BY(name)                                       
                    \
-    void METRIC_FUNC_NAME_INCREMENT_BY(name)(int64_t x) { 
METRIC_VAR_INCREMENT_BY(name, x); }
+// Implement a member function that increments the metric variable by some 
value.
+#define METRIC_DEFINE_INCREMENT_BY(name) METRIC_DEFINE_ONE_ARG(INCREMENT_BY, 
name, int64_t)
 
 // To be adaptive to self-defined `increment_by` methods, arguments are 
declared as variadic.
-#define METRIC_INCREMENT_BY(obj, name, ...) 
(obj).METRIC_FUNC_NAME_INCREMENT_BY(name)(__VA_ARGS__)
+#define METRIC_INCREMENT_BY(obj, name, ...) METRIC_CALL(obj, INCREMENT_BY, 
name, ##__VA_ARGS__)
 
+// The name of the member function that increments the metric variable by one.
 #define METRIC_FUNC_NAME_INCREMENT(name) increment_##name
 
-#define METRIC_DEFINE_INCREMENT(name)                                          
                    \
-    void METRIC_FUNC_NAME_INCREMENT(name)() { METRIC_VAR_INCREMENT(name); }
+// Implement a member function that increments the metric variable by one.
+#define METRIC_DEFINE_INCREMENT(name) METRIC_DEFINE_NO_ARG(INCREMENT, name)
+
+// Implement a member function that increments the metric variable by one if 
NOT NULL.
+#define METRIC_DEFINE_INCREMENT_NOTNULL(name) 
METRIC_DEFINE_NO_ARG_NOTNULL(INCREMENT, name)
 
 // To be adaptive to self-defined `increment` methods, arguments are declared 
as variadic.
-#define METRIC_INCREMENT(obj, name, ...) 
(obj).METRIC_FUNC_NAME_INCREMENT(name)(__VA_ARGS__)
+#define METRIC_INCREMENT(obj, name, ...) METRIC_CALL(obj, INCREMENT, name, 
##__VA_ARGS__)
+
+// The name of the member function that decrements the metric variable by one.
+#define METRIC_FUNC_NAME_DECREMENT(name) decrement_##name
 
+// Implement a member function that decrements the metric variable by one.
+#define METRIC_DEFINE_DECREMENT(name) METRIC_DEFINE_NO_ARG(DECREMENT, name)
+
+// Implement a member function that decrements the metric variable by one if 
NOT NULL.
+#define METRIC_DEFINE_DECREMENT_NOTNULL(name) 
METRIC_DEFINE_NO_ARG_NOTNULL(DECREMENT, name)
+
+// To be adaptive to self-defined `decrement` methods, arguments are declared 
as variadic.
+#define METRIC_DECREMENT(obj, name, ...) METRIC_CALL(obj, DECREMENT, name, 
##__VA_ARGS__)
+
+// The name of the member function that sets the metric variable with some 
value.
 #define METRIC_FUNC_NAME_SET(name) set_##name
 
-#define METRIC_DEFINE_SET(name, value_type)                                    
                    \
-    void METRIC_FUNC_NAME_SET(name)(value_type value) { METRIC_VAR_SET(name, 
value); }
+// Implement a member function that sets the metric variable with some value.
+#define METRIC_DEFINE_SET(name, value_type) METRIC_DEFINE_ONE_ARG(SET, name, 
value_type)
+
+// Implement a member function that sets the metric variable with some value 
if NOT NULL.
+#define METRIC_DEFINE_SET_NOTNULL(name, value_type)                            
                    \
+    METRIC_DEFINE_ONE_ARG_NOTNULL(SET, name, value_type)
 
 // To be adaptive to self-defined `set` methods, arguments are declared as 
variadic.
-#define METRIC_SET(obj, name, ...) 
(obj).METRIC_FUNC_NAME_SET(name)(__VA_ARGS__)
+#define METRIC_SET(obj, name, ...) METRIC_CALL(obj, SET, name, ##__VA_ARGS__)
 
+// The name of the member function that gets the value of the metric variable.
 #define METRIC_FUNC_NAME_VALUE(name) get_##name
 
-#define METRIC_DEFINE_VALUE(name, value_type)                                  
                    \
-    value_type METRIC_FUNC_NAME_VALUE(name)() { return METRIC_VAR_VALUE(name); 
}
+// Implement a member function that gets the value of the metric variable.
+#define METRIC_DEFINE_VALUE(name, value_type) 
METRIC_DEFINE_RET_AND_NO_ARG(value_type, VALUE, name)
 
 // To be adaptive to self-defined `value` methods, arguments are declared as 
variadic.
-#define METRIC_VALUE(obj, name, ...) 
(obj).METRIC_FUNC_NAME_VALUE(name)(__VA_ARGS__)
+#define METRIC_VALUE(obj, name, ...) METRIC_CALL(obj, VALUE, name, 
##__VA_ARGS__)
 
 namespace dsn {
 class metric;                  // IWYU pragma: keep


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to