This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ec23c34  Return detailed replication stats for running and pending jobs
ec23c34 is described below

commit ec23c342b722db289c3dd54fc61221ae26dec7b5
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Oct 31 11:58:39 2019 -0400

    Return detailed replication stats for running and pending jobs
    
    Previously `_scheduled/docs` returned detailed replication statistics for
    completed jobs only. To get the same level of details from a running or 
pending
    jobs users had to use `_active_tasks`, which is not optimal and required 
jumping
    between monitoring endpoints.
    
    `info` field was originally meant to hold these statistics but they were not
    implemented and it just returned `null` as a placeholder. With work for 3.0
    finalizing, this might be a good time to add this improvement to avoid
    disturbing the API afterwards.
    
    Just updating the `_scheduler/docs` was not quite enough since, replications
    started from the `_replicate` endpoint would not be visible there and users
    would still have to access `_active_tasks` to get inspect them, so let's add
    the `info` field to the `_scheduler/jobs` as well.
    
    After this update, all states and status details from `_active_tasks` and
    `_replicator` docs should be available under `_scheduler/jobs` and
    `_scheduler/docs` endpoints.
---
 .../src/couch_replicator_doc_processor.erl         | 11 +----
 .../src/couch_replicator_scheduler.erl             | 21 ++++++---
 .../src/couch_replicator_scheduler_job.erl         | 16 +++----
 .../src/couch_replicator_stats.erl                 | 50 ++++++++++-----------
 .../src/couch_replicator_utils.erl                 | 16 ++++++-
 ...ch_replicator_retain_stats_between_job_runs.erl | 51 ++++++++++++++++++++--
 6 files changed, 107 insertions(+), 58 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_doc_processor.erl 
b/src/couch_replicator/src/couch_replicator_doc_processor.erl
index 23cdeea..29170ed 100644
--- a/src/couch_replicator/src/couch_replicator_doc_processor.erl
+++ b/src/couch_replicator/src/couch_replicator_doc_processor.erl
@@ -533,15 +533,6 @@ doc_lookup(Db, DocId, HealthThreshold) ->
     end.
 
 
--spec ejson_state_info(binary() | nil) -> binary() | null.
-ejson_state_info(nil) ->
-    null;
-ejson_state_info(Info) when is_binary(Info) ->
-    Info;
-ejson_state_info(Info) ->
-    couch_replicator_utils:rep_error_to_binary(Info).
-
-
 -spec ejson_rep_id(rep_id() | nil) -> binary() | null.
 ejson_rep_id(nil) ->
     null;
@@ -579,7 +570,7 @@ ejson_doc(#rdoc{state = RepState} = RDoc, _HealthThreshold) 
->
         {database, DbName},
         {id, ejson_rep_id(RepId)},
         {state, RepState},
-        {info, ejson_state_info(StateInfo)},
+        {info, couch_replicator_utils:ejson_state_info(StateInfo)},
         {error_count, ErrorCount},
         {node, node()},
         {last_updated, couch_replicator_utils:iso8601(StateTime)},
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl 
b/src/couch_replicator/src/couch_replicator_scheduler.erl
index c9da377..d534973 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -148,19 +148,19 @@ job_summary(JobId, HealthThreshold) ->
                         [{{crashed, Error}, _When} | _] ->
                             {crashing, crash_reason_json(Error)};
                         [_ | _] ->
-                            {pending, null}
+                            {pending, Rep#rep.stats}
                     end;
                 {undefined, ErrorCount} when ErrorCount > 0 ->
                      [{{crashed, Error}, _When} | _] = History,
                      {crashing, crash_reason_json(Error)};
                 {Pid, ErrorCount} when is_pid(Pid) ->
-                     {running, null}
+                     {running, Rep#rep.stats}
             end,
             [
                 {source, iolist_to_binary(ejson_url(Rep#rep.source))},
                 {target, iolist_to_binary(ejson_url(Rep#rep.target))},
                 {state, State},
-                {info, Info},
+                {info, couch_replicator_utils:ejson_state_info(Info)},
                 {error_count, ErrorCount},
                 {last_updated, last_updated(History)},
                 {start_time,
@@ -829,6 +829,7 @@ job_ejson(Job) ->
         {database, Rep#rep.db_name},
         {user, (Rep#rep.user_ctx)#user_ctx.name},
         {doc_id, Rep#rep.doc_id},
+        {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)},
         {history, History},
         {node, node()},
         {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)}
@@ -1431,7 +1432,12 @@ t_job_summary_running() ->
         Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
         ?assertEqual(running, proplists:get_value(state, Summary)),
         ?assertEqual(null, proplists:get_value(info, Summary)),
-        ?assertEqual(0, proplists:get_value(error_count, Summary))
+        ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+        Stats = [{source_seq, <<"1-abc">>}],
+        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+        Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+        ?assertEqual({Stats}, proplists:get_value(info, Summary1))
     end).
 
 
@@ -1447,7 +1453,12 @@ t_job_summary_pending() ->
         Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
         ?assertEqual(pending, proplists:get_value(state, Summary)),
         ?assertEqual(null, proplists:get_value(info, Summary)),
-        ?assertEqual(0, proplists:get_value(error_count, Summary))
+        ?assertEqual(0, proplists:get_value(error_count, Summary)),
+
+        Stats = [{doc_write_failures, 1}],
+        handle_cast({update_job_stats, job1, Stats}, mock_state(1)),
+        Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC),
+        ?assertEqual({Stats}, proplists:get_value(info, Summary1))
     end).
 
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 565a2bd..d69febb 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -600,7 +600,7 @@ init_state(Rep) ->
                                         ?DEFAULT_CHECKPOINT_INTERVAL),
         type = Type,
         view = View,
-        stats = Stats
+        stats = couch_replicator_stats:new(Stats)
     },
     State#rep_state{timer = start_timer(State)}.
 
@@ -949,20 +949,16 @@ get_pending_count_int(#rep_state{source = Db}=St) ->
 
 update_task(State) ->
     #rep_state{
+        rep_details = #rep{id = JobId},
         current_through_seq = {_, ThroughSeq},
         highest_seq_done = {_, HighestSeq}
     } = State,
-    update_scheduler_job_stats(State),
-    couch_task_status:update(
-        rep_stats(State) ++ [
+    Status = rep_stats(State) ++ [
         {source_seq, HighestSeq},
         {through_seq, ThroughSeq}
-    ]).
-
-
-update_scheduler_job_stats(#rep_state{rep_details = Rep, stats = Stats}) ->
-    JobId = Rep#rep.id,
-    couch_replicator_scheduler:update_job_stats(JobId, Stats).
+    ],
+    couch_replicator_scheduler:update_job_stats(JobId, Status),
+    couch_task_status:update(Status).
 
 
 rep_stats(State) ->
diff --git a/src/couch_replicator/src/couch_replicator_stats.erl 
b/src/couch_replicator/src/couch_replicator_stats.erl
index af8ba4e..cd62949 100644
--- a/src/couch_replicator/src/couch_replicator_stats.erl
+++ b/src/couch_replicator/src/couch_replicator_stats.erl
@@ -12,14 +12,6 @@
 
 -module(couch_replicator_stats).
 
--record(rep_stats, {
-    missing_checked = 0,
-    missing_found = 0,
-    docs_read = 0,
-    docs_written = 0,
-    doc_write_failures = 0
-}).
-
 -export([
     new/0,
     new/1,
@@ -39,26 +31,27 @@
 new() ->
     orddict:new().
 
-new(Initializers) when is_list(Initializers) ->
-    orddict:from_list(Initializers).
+new(Initializers0) when is_list(Initializers0) ->
+    Initializers1 = lists:filtermap(fun fmap/1, Initializers0),
+    orddict:from_list(Initializers1).
 
 missing_checked(Stats) ->
-    get(missing_checked, upgrade(Stats)).
+    get(missing_checked, Stats).
 
 missing_found(Stats) ->
-    get(missing_found, upgrade(Stats)).
+    get(missing_found, Stats).
 
 docs_read(Stats) ->
-    get(docs_read, upgrade(Stats)).
+    get(docs_read, Stats).
 
 docs_written(Stats) ->
-    get(docs_written, upgrade(Stats)).
+    get(docs_written, Stats).
 
 doc_write_failures(Stats) ->
-    get(doc_write_failures, upgrade(Stats)).
+    get(doc_write_failures, Stats).
 
 get(Field, Stats) ->
-    case orddict:find(Field, upgrade(Stats)) of
+    case orddict:find(Field, Stats) of
         {ok, Value} ->
             Value;
         error ->
@@ -66,18 +59,19 @@ get(Field, Stats) ->
     end.
 
 increment(Field, Stats) ->
-    orddict:update_counter(Field, 1, upgrade(Stats)).
+    orddict:update_counter(Field, 1, Stats).
 
 sum_stats(S1, S2) ->
-    orddict:merge(fun(_, V1, V2) -> V1+V2 end, upgrade(S1), upgrade(S2)).
+    orddict:merge(fun(_, V1, V2) -> V1+V2 end, S1, S2).
+
 
-upgrade(#rep_stats{} = Stats) ->
-    orddict:from_list([
-        {missing_checked, Stats#rep_stats.missing_checked},
-        {missing_found, Stats#rep_stats.missing_found},
-        {docs_read, Stats#rep_stats.docs_read},
-        {docs_written, Stats#rep_stats.docs_written},
-        {doc_write_failures, Stats#rep_stats.doc_write_failures}
-    ]);
-upgrade(Stats) ->
-    Stats.
+% Handle initializing from a status object which uses same values but different
+% field names.
+fmap({revisions_checked, V})       -> {true, {missing_checked, V}};
+fmap({missing_revisions_found, V}) -> {true, {missing_found, V}};
+fmap({missing_checked, _})         -> true;
+fmap({missing_found, _})           -> true;
+fmap({docs_read, _})               -> true;
+fmap({docs_written, _})            -> true;
+fmap({doc_write_failures, _})      -> true;
+fmap({_, _})                       -> false.
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl 
b/src/couch_replicator/src/couch_replicator_utils.erl
index ccf2413..856c1b5 100644
--- a/src/couch_replicator/src/couch_replicator_utils.erl
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -24,7 +24,8 @@
    iso8601/1,
    filter_state/3,
    remove_basic_auth_from_headers/1,
-   normalize_rep/1
+   normalize_rep/1,
+   ejson_state_info/1
 ]).
 
 
@@ -176,6 +177,19 @@ normalize_rep(#rep{} = Rep)->
     }.
 
 
+-spec ejson_state_info(binary() | nil) -> binary() | null.
+ejson_state_info(nil) ->
+    null;
+ejson_state_info(Info) when is_binary(Info) ->
+    Info;
+ejson_state_info([]) ->
+    null;  % Status not set yet => null for compatibility reasons
+ejson_state_info([{_, _} | _] = Info) ->
+    {Info};
+ejson_state_info(Info) ->
+    couch_replicator_utils:rep_error_to_binary(Info).
+
+
 -ifdef(TEST).
 
 -include_lib("eunit/include/eunit.hrl").
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
 
b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
index 3b7377b..9dd86b3 100644
--- 
a/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
+++ 
b/src/couch_replicator/test/eunit/couch_replicator_retain_stats_between_job_runs.erl
@@ -23,7 +23,7 @@
 
 
 setup() ->
-    Ctx = test_util:start_couch([couch_replicator]),
+    Ctx = test_util:start_couch([couch_replicator, chttpd, mem3, fabric]),
     Source = setup_db(),
     Target = setup_db(),
     {Ctx, {Source, Target}}.
@@ -49,10 +49,17 @@ t_stats_retained({_Ctx, {Source, Target}}) ->
     ?_test(begin
         populate_db(Source, 42),
         {ok, RepPid, RepId} = replicate(Source, Target),
+
         wait_target_in_sync(Source, Target),
         check_active_tasks(42, 42),
-        reschedule_job(RepPid),
+        check_scheduler_jobs(42, 42),
+
+        stop_job(RepPid),
+        check_scheduler_jobs(42, 42),
+
+        start_job(),
         check_active_tasks(42, 42),
+        check_scheduler_jobs(42, 42),
         couch_replicator_scheduler:remove_job(RepId)
     end).
 
@@ -69,7 +76,7 @@ teardown_db(DbName) ->
     ok.
 
 
-reschedule_job(RepPid) ->
+stop_job(RepPid) ->
     Ref = erlang:monitor(process, RepPid),
     gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 0}),
     couch_replicator_scheduler:reschedule(),
@@ -77,7 +84,10 @@ reschedule_job(RepPid) ->
         {'DOWN', Ref, _, _, _} -> ok
     after ?TIMEOUT ->
         erlang:error(timeout)
-    end,
+    end.
+
+
+start_job() ->
     gen_server:cast(couch_replicator_scheduler, {set_max_jobs, 500}),
     couch_replicator_scheduler:reschedule().
 
@@ -89,6 +99,20 @@ check_active_tasks(DocsRead, DocsWritten) ->
     ?assertEqual(DocsWritten, couch_util:get_value(docs_written, RepTask)).
 
 
+check_scheduler_jobs(DocsRead, DocsWritten) ->
+    Info = wait_scheduler_info(),
+    ?assert(maps:is_key(<<"changes_pending">>, Info)),
+    ?assert(maps:is_key(<<"doc_write_failures">>, Info)),
+    ?assert(maps:is_key(<<"docs_read">>, Info)),
+    ?assert(maps:is_key(<<"docs_written">>, Info)),
+    ?assert(maps:is_key(<<"missing_revisions_found">>, Info)),
+    ?assert(maps:is_key(<<"checkpointed_source_seq">>, Info)),
+    ?assert(maps:is_key(<<"source_seq">>, Info)),
+    ?assert(maps:is_key(<<"revisions_checked">>, Info)),
+    ?assertMatch(#{<<"docs_read">> := DocsRead}, Info),
+    ?assertMatch(#{<<"docs_written">> := DocsWritten}, Info).
+
+
 replication_tasks() ->
     lists:filter(fun(P) ->
         couch_util:get_value(type, P) =:= replication
@@ -104,6 +128,16 @@ wait_for_task_status() ->
     end).
 
 
+wait_scheduler_info() ->
+    test_util:wait(fun() ->
+        case scheduler_jobs() of
+            [] -> wait;
+            [#{<<"info">> := null}] -> wait;
+            [#{<<"info">> := Info}] -> Info
+        end
+    end).
+
+
 populate_db(DbName, DocCount) ->
     {ok, Db} = couch_db:open_int(DbName, []),
     Docs = lists:foldl(
@@ -158,3 +192,12 @@ replicate(Source, Target) ->
     couch_replicator_scheduler:reschedule(),
     Pid = couch_replicator_test_helper:get_pid(Rep#rep.id),
     {ok, Pid, Rep#rep.id}.
+
+
+scheduler_jobs() ->
+    Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
+    Port = mochiweb_socket_server:get(chttpd, port),
+    Url = lists:flatten(io_lib:format("http://~s:~b/_scheduler/jobs";, [Addr, 
Port])),
+    {ok, 200, _, Body} = test_request:get(Url, []),
+    Json = jiffy:decode(Body, [return_maps]),
+    maps:get(<<"jobs">>, Json).

Reply via email to