This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 
add-aggregated-metrics-for-rexi-server-and-buffer
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 23a53844ef86f94d7a46ee593e78c9e0940cb882
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Jul 22 12:27:08 2024 -0400

    Add aggregate rexi server and rexi buffer message queue metrics
    
    We have aggregate couch_server and index server metrics, but rexi server and
    buffers, which are also sharded were missing, so this commit adds them.
    
    In addition noticed we didn't have an test for `/_system` test so added one
    which check most of the functionality: base VM metrics, computed, aggregated
    and simple message queue length, etc.
---
 src/chttpd/src/chttpd_node.erl             | 29 +++++++++++-----
 src/chttpd/test/eunit/chttpd_misc_test.erl | 53 ++++++++++++++++++++++++++++++
 src/rexi/src/rexi.erl                      |  7 ++++
 src/rexi/src/rexi_server_mon.erl           | 17 +++++++++-
 4 files changed, 96 insertions(+), 10 deletions(-)

diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl
index e0e8fe0ef..fa72c83ce 100644
--- a/src/chttpd/src/chttpd_node.erl
+++ b/src/chttpd/src/chttpd_node.erl
@@ -405,18 +405,29 @@ get_distribution_stats() ->
 message_queues() ->
     MessageQueuesAgg = [
         {couch_server, couch_server:aggregate_queue_len()},
-        {index_server, couch_index_server:aggregate_queue_len()}
+        {index_server, couch_index_server:aggregate_queue_len()},
+        {rexi_server, rexi:aggregate_server_queue_len()},
+        {rexi_buffer, rexi:aggregate_buffer_queue_len()}
     ],
-    MessageQueuesReg = lists:map(
-        fun(Name) ->
-            Type = message_queue_len,
-            {Type, Length} = process_info(whereis(Name), Type),
-            {Name, Length}
-        end,
-        registered()
-    ),
+    MessageQueuesReg = lists:filtermap(fun message_queue/1, registered()),
     MessageQueuesAgg ++ MessageQueuesReg.
 
+message_queue(rexi_server) ->
+    % Compatibility clause. Remove in 3.4+ version when singleton
+    % rexi_server is removed.
+    false;
+message_queue(Name) ->
+    Pid = whereis(Name),
+    case is_pid(Pid) of
+        true ->
+            case process_info(Pid, message_queue_len) of
+                {message_queue_len, Length} -> {true, {Name, Length}};
+                _ -> false
+            end;
+        false ->
+            false
+    end.
+
 %% Workaround for https://bugs.erlang.org/browse/ERL-1355
 run_queues() ->
     case erlang:system_info(dirty_cpu_schedulers) > 0 of
diff --git a/src/chttpd/test/eunit/chttpd_misc_test.erl 
b/src/chttpd/test/eunit/chttpd_misc_test.erl
index 030ab1760..cc54eb4d3 100644
--- a/src/chttpd/test/eunit/chttpd_misc_test.erl
+++ b/src/chttpd/test/eunit/chttpd_misc_test.erl
@@ -181,6 +181,59 @@ t_multiple_uuids(Url) ->
     ?assert(Uuid1 /= Uuid2),
     ?assert(Uuid1 /= Uuid3).
 
+system_test_() ->
+    {
+        "chttpd _node/_local/_system endpoint tests",
+        {
+            setup,
+            fun chttpd_test_util:start_couch/0,
+            fun chttpd_test_util:stop_couch/1,
+            {
+                foreach,
+                fun setup/0,
+                fun teardown/1,
+                [
+                    ?TDEF_FE(t_system)
+                ]
+            }
+        }
+    }.
+
+t_system(Url) ->
+    {ok, Code, _, Body} = req_get(Url ++ "/_node/_local/_system"),
+    ?assertEqual(200, Code),
+    % The body is quite large, so test a general subset of functionality:
+    %   - Metrics from the VM (context_switches), some simple, and some 
histograms
+    %   - Custom computed metrics like internal_replication_jobs
+    %   - Simple, histogrammed and aggregated message queues, from the 
vm/dependencies/couch services
+    %
+    ?assertMatch(
+        #{
+            <<"context_switches">> := _,
+            <<"distribution">> := #{},
+            <<"internal_replication_jobs">> := _,
+            <<"memory">> := #{
+                <<"binary">> := _,
+                <<"processes">> := _
+            },
+            <<"message_queues">> := #{
+                <<"couch_db_updater">> := #{},
+                <<"couch_event_server">> := _,
+                <<"couch_file">> := #{},
+                <<"couch_server">> := _,
+                <<"couch_server_1">> := _,
+                <<"ibrowse">> := _,
+                <<"index_server">> := _,
+                <<"index_server_1">> := _,
+                <<"init">> := _,
+                <<"logger">> := _,
+                <<"rexi_buffer">> := _,
+                <<"rexi_server">> := _
+            }
+        },
+        Body
+    ).
+
 req_get(Url) ->
     {ok, Code, Headers, Body} = test_request:get(Url, [?CONTENT_JSON, ?AUTH]),
     {ok, Code, Headers, jiffy:decode(Body, [return_maps])}.
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index 7d09c977d..4d5736530 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -18,6 +18,7 @@
 -export([stream_ack/1]).
 -export([stream2/1, stream_last/1, stream_last/2]).
 -export([ping/0]).
+-export([aggregate_server_queue_len/0, aggregate_buffer_queue_len/0]).
 
 %% @equiv cast(Node, self(), MFA)
 -spec cast(node(), {atom(), atom(), list()}) -> reference().
@@ -159,6 +160,12 @@ ping() ->
     {Caller, _} = get(rexi_from),
     erlang:send(Caller, {rexi, '$rexi_ping'}).
 
+aggregate_server_queue_len() ->
+    rexi_server_mon:aggregate_queue_len(rexi_server).
+
+aggregate_buffer_queue_len() ->
+    rexi_server_mon:aggregate_queue_len(rexi_buffer).
+
 %% internal functions %%
 
 cast_msg(Msg) -> {'$gen_cast', Msg}.
diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl
index eb8683a95..7156a5584 100644
--- a/src/rexi/src/rexi_server_mon.erl
+++ b/src/rexi/src/rexi_server_mon.erl
@@ -18,7 +18,8 @@
 
 -export([
     start_link/1,
-    status/0
+    status/0,
+    aggregate_queue_len/1
 ]).
 
 -export([
@@ -42,6 +43,9 @@ start_link(ChildMod) ->
 status() ->
     gen_server:call(?MODULE, status).
 
+aggregate_queue_len(ChildMod) ->
+    lists:sum([message_queue_len(ServerId) || ServerId <- 
server_ids(ChildMod)]).
+
 % Mem3 cluster callbacks
 
 cluster_unstable(Server) ->
@@ -151,3 +155,14 @@ stop_server(ChildMod, ChildId) ->
 
 sup_module(ChildMod) ->
     list_to_atom(lists:concat([ChildMod, "_sup"])).
+
+message_queue_len(ServerId) when is_atom(ServerId) ->
+    case whereis(ServerId) of
+        Pid when is_pid(Pid) ->
+            case process_info(Pid, message_queue_len) of
+                {message_queue_len, Length} -> Length;
+                _ -> 0
+            end;
+        _ ->
+            0
+    end.

Reply via email to