This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch add-aggregated-metrics-for-rexi-server-and-buffer in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 23a53844ef86f94d7a46ee593e78c9e0940cb882 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Mon Jul 22 12:27:08 2024 -0400 Add aggregate rexi server and rexi buffer message queue metrics We have aggregate couch_server and index server metrics, but rexi server and buffers, which are also sharded were missing, so this commit adds them. In addition noticed we didn't have an test for `/_system` test so added one which check most of the functionality: base VM metrics, computed, aggregated and simple message queue length, etc. --- src/chttpd/src/chttpd_node.erl | 29 +++++++++++----- src/chttpd/test/eunit/chttpd_misc_test.erl | 53 ++++++++++++++++++++++++++++++ src/rexi/src/rexi.erl | 7 ++++ src/rexi/src/rexi_server_mon.erl | 17 +++++++++- 4 files changed, 96 insertions(+), 10 deletions(-) diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl index e0e8fe0ef..fa72c83ce 100644 --- a/src/chttpd/src/chttpd_node.erl +++ b/src/chttpd/src/chttpd_node.erl @@ -405,18 +405,29 @@ get_distribution_stats() -> message_queues() -> MessageQueuesAgg = [ {couch_server, couch_server:aggregate_queue_len()}, - {index_server, couch_index_server:aggregate_queue_len()} + {index_server, couch_index_server:aggregate_queue_len()}, + {rexi_server, rexi:aggregate_server_queue_len()}, + {rexi_buffer, rexi:aggregate_buffer_queue_len()} ], - MessageQueuesReg = lists:map( - fun(Name) -> - Type = message_queue_len, - {Type, Length} = process_info(whereis(Name), Type), - {Name, Length} - end, - registered() - ), + MessageQueuesReg = lists:filtermap(fun message_queue/1, registered()), MessageQueuesAgg ++ MessageQueuesReg. +message_queue(rexi_server) -> + % Compatibility clause. Remove in 3.4+ version when singleton + % rexi_server is removed. + false; +message_queue(Name) -> + Pid = whereis(Name), + case is_pid(Pid) of + true -> + case process_info(Pid, message_queue_len) of + {message_queue_len, Length} -> {true, {Name, Length}}; + _ -> false + end; + false -> + false + end. + %% Workaround for https://bugs.erlang.org/browse/ERL-1355 run_queues() -> case erlang:system_info(dirty_cpu_schedulers) > 0 of diff --git a/src/chttpd/test/eunit/chttpd_misc_test.erl b/src/chttpd/test/eunit/chttpd_misc_test.erl index 030ab1760..cc54eb4d3 100644 --- a/src/chttpd/test/eunit/chttpd_misc_test.erl +++ b/src/chttpd/test/eunit/chttpd_misc_test.erl @@ -181,6 +181,59 @@ t_multiple_uuids(Url) -> ?assert(Uuid1 /= Uuid2), ?assert(Uuid1 /= Uuid3). +system_test_() -> + { + "chttpd _node/_local/_system endpoint tests", + { + setup, + fun chttpd_test_util:start_couch/0, + fun chttpd_test_util:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_system) + ] + } + } + }. + +t_system(Url) -> + {ok, Code, _, Body} = req_get(Url ++ "/_node/_local/_system"), + ?assertEqual(200, Code), + % The body is quite large, so test a general subset of functionality: + % - Metrics from the VM (context_switches), some simple, and some histograms + % - Custom computed metrics like internal_replication_jobs + % - Simple, histogrammed and aggregated message queues, from the vm/dependencies/couch services + % + ?assertMatch( + #{ + <<"context_switches">> := _, + <<"distribution">> := #{}, + <<"internal_replication_jobs">> := _, + <<"memory">> := #{ + <<"binary">> := _, + <<"processes">> := _ + }, + <<"message_queues">> := #{ + <<"couch_db_updater">> := #{}, + <<"couch_event_server">> := _, + <<"couch_file">> := #{}, + <<"couch_server">> := _, + <<"couch_server_1">> := _, + <<"ibrowse">> := _, + <<"index_server">> := _, + <<"index_server_1">> := _, + <<"init">> := _, + <<"logger">> := _, + <<"rexi_buffer">> := _, + <<"rexi_server">> := _ + } + }, + Body + ). + req_get(Url) -> {ok, Code, Headers, Body} = test_request:get(Url, [?CONTENT_JSON, ?AUTH]), {ok, Code, Headers, jiffy:decode(Body, [return_maps])}. diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index 7d09c977d..4d5736530 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -18,6 +18,7 @@ -export([stream_ack/1]). -export([stream2/1, stream_last/1, stream_last/2]). -export([ping/0]). +-export([aggregate_server_queue_len/0, aggregate_buffer_queue_len/0]). %% @equiv cast(Node, self(), MFA) -spec cast(node(), {atom(), atom(), list()}) -> reference(). @@ -159,6 +160,12 @@ ping() -> {Caller, _} = get(rexi_from), erlang:send(Caller, {rexi, '$rexi_ping'}). +aggregate_server_queue_len() -> + rexi_server_mon:aggregate_queue_len(rexi_server). + +aggregate_buffer_queue_len() -> + rexi_server_mon:aggregate_queue_len(rexi_buffer). + %% internal functions %% cast_msg(Msg) -> {'$gen_cast', Msg}. diff --git a/src/rexi/src/rexi_server_mon.erl b/src/rexi/src/rexi_server_mon.erl index eb8683a95..7156a5584 100644 --- a/src/rexi/src/rexi_server_mon.erl +++ b/src/rexi/src/rexi_server_mon.erl @@ -18,7 +18,8 @@ -export([ start_link/1, - status/0 + status/0, + aggregate_queue_len/1 ]). -export([ @@ -42,6 +43,9 @@ start_link(ChildMod) -> status() -> gen_server:call(?MODULE, status). +aggregate_queue_len(ChildMod) -> + lists:sum([message_queue_len(ServerId) || ServerId <- server_ids(ChildMod)]). + % Mem3 cluster callbacks cluster_unstable(Server) -> @@ -151,3 +155,14 @@ stop_server(ChildMod, ChildId) -> sup_module(ChildMod) -> list_to_atom(lists:concat([ChildMod, "_sup"])). + +message_queue_len(ServerId) when is_atom(ServerId) -> + case whereis(ServerId) of + Pid when is_pid(Pid) -> + case process_info(Pid, message_queue_len) of + {message_queue_len, Length} -> Length; + _ -> 0 + end; + _ -> + 0 + end.
