This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 641b39373 Optimize and clean up couch_multidb_changes
641b39373 is described below

commit 641b39373d540fbb8eb4c31700ff88d2503a7b98
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Dec 12 12:11:45 2023 -0500

    Optimize and clean up couch_multidb_changes
    
    couch_multidb_changes is in charge of monitoring changes to multiple 
databases.
    It is what drives the couch_replicator application when users update
    replication docs. It starts off by scanning all the local db shards and
    launches changes feeds for each of them. After it processes each changes 
feed,
    it checkpoints where it stops. As the shards are updated, it starts change
    feeds for those updated shards and checkpoints again. The checkpoints are 
kept
    in an ets table and the main logic which decides when to start a changes 
feed
    for a particular shard is in the resume_scan/2 function.
    
    This change makes a few small optimizations:
    
     * Use a map to track Pid -> DbName mappings. This avoids using a O(N)
       operation for looking up exiting change feed processes. So `pids` is
       switched to be a map of `#{Pid => DbName}` and the ets table has a 4th 
tuple
       member to keep track of `dbname -> pid` mappings.
    
     * Previously, when the plain "suffix" dbname (just <<"_replicator">>) was
       found, we tried to open and close it to see if it exists. Change to use
       `couch_server:exists/1` instead.
    
    The are also a few clean-ups:
    
     * Update tests to use ?TDEF_FE/?TDEF macros. This shortens them a bit and
       saves one indentation level.
    
     * Increase test coverage from 86% to 96%. To help create a few more test
       scenarios switched to using a public ets table instead of a private one.
---
 src/couch/src/couch_multidb_changes.erl            | 835 ++++++++++-----------
 .../couch_replicator_scheduler_docs_tests.erl      |  14 +-
 2 files changed, 415 insertions(+), 434 deletions(-)

diff --git a/src/couch/src/couch_multidb_changes.erl 
b/src/couch/src/couch_multidb_changes.erl
index a9b4c4fb6..ecb7e1bfe 100644
--- a/src/couch/src/couch_multidb_changes.erl
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -46,7 +46,7 @@
     suffix :: binary(),
     event_server :: reference(),
     scanner :: nil | pid(),
-    pids :: [{binary(), pid()}],
+    pids :: #{},
     skip_ddocs :: boolean()
 }).
 
@@ -85,36 +85,34 @@ init([DbSuffix, Module, Context, Opts]) ->
     process_flag(trap_exit, true),
     Server = self(),
     {ok, #state{
-        tid = ets:new(?MODULE, [set, protected]),
+        tid = ets:new(?MODULE, [set, public]),
         mod = Module,
         ctx = Context,
         suffix = DbSuffix,
         event_server = register_with_event_server(Server),
         scanner = spawn_link(fun() -> scan_all_dbs(Server, DbSuffix) end),
-        pids = [],
+        pids = #{},
         skip_ddocs = proplists:is_defined(skip_ddocs, Opts)
     }}.
 
 terminate(_Reason, _State) ->
     ok.
 
-handle_call(
-    {change, DbName, Change},
-    _From,
-    #state{skip_ddocs = SkipDDocs, mod = Mod, ctx = Ctx} = State
-) ->
+handle_call({change, DbName, Change}, _From, #state{} = State) ->
+    #state{skip_ddocs = SkipDDocs, mod = Mod, ctx = Ctx} = State,
     case {SkipDDocs, is_design_doc(Change)} of
         {true, true} ->
             {reply, ok, State};
         {_, _} ->
             {reply, ok, State#state{ctx = Mod:db_change(DbName, Change, Ctx)}}
     end;
-handle_call({checkpoint, DbName, EndSeq}, _From, #state{tid = Ets} = State) ->
+handle_call({checkpoint, DbName, EndSeq}, {Pid, _Tag} = _From, #state{tid = 
Ets} = State) ->
     case ets:lookup(Ets, DbName) of
-        [] ->
-            true = ets:insert(Ets, {DbName, EndSeq, false});
-        [{DbName, _OldSeq, Rescan}] ->
-            true = ets:insert(Ets, {DbName, EndSeq, Rescan})
+        [{DbName, _OldSeq, Rescan, Pid}] ->
+            true = ets:insert(Ets, {DbName, EndSeq, Rescan, Pid});
+        _ ->
+            % Ignore stale checkpoints or checkpoints from unknown change feeds
+            ok
     end,
     {reply, ok, State}.
 
@@ -134,10 +132,10 @@ handle_info({'EXIT', From, normal}, #state{scanner = 
From} = State) ->
     {noreply, State#state{scanner = nil}};
 handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->
     {stop, {scanner_died, Reason}, State};
-handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) ->
+handle_info({'EXIT', From, Reason}, #state{pids = #{} = Pids} = State) ->
     couch_log:debug("~p change feed exited ~p", [State#state.suffix, From]),
-    case lists:keytake(From, 2, Pids) of
-        {value, {DbName, From}, NewPids} ->
+    case maps:take(From, Pids) of
+        {DbName, NewPids} ->
             if
                 Reason == normal ->
                     ok;
@@ -147,14 +145,18 @@ handle_info({'EXIT', From, Reason}, #state{pids = Pids} = 
State) ->
             end,
             NewState = State#state{pids = NewPids},
             case ets:lookup(State#state.tid, DbName) of
-                [{DbName, _EndSeq, true}] ->
+                [{DbName, _EndSeq, true, From}] ->
+                    % Match the From pid explicitly and then clear it
+                    % The pid is at 4th position in the ets object
+                    ets:update_element(NewState#state.tid, DbName, {4, 
undefined}),
                     {noreply, resume_scan(DbName, NewState)};
+                [{DbName, _EndSeq, false, From}] ->
+                    ets:update_element(NewState#state.tid, DbName, {4, 
undefined}),
+                    {noreply, NewState};
                 _ ->
                     {noreply, NewState}
             end;
-        false when Reason == normal ->
-            {noreply, State};
-        false ->
+        error ->
             Fmt = "~s(~p) : Unknown pid ~w died :: ~w",
             couch_log:error(Fmt, [?MODULE, State#state.suffix, From, Reason]),
             {stop, {unexpected_exit, From, Reason}, State}
@@ -182,34 +184,28 @@ db_callback(_Other, _DbName, State) ->
     State.
 
 -spec resume_scan(binary(), #state{}) -> #state{}.
-resume_scan(DbName, #state{pids = Pids, tid = Ets} = State) ->
-    case {lists:keyfind(DbName, 1, Pids), ets:lookup(Ets, DbName)} of
-        {{DbName, _}, []} ->
-            % Found existing change feed, but not entry in ETS
-            % Flag a need to rescan from begining
-            true = ets:insert(Ets, {DbName, 0, true}),
-            State;
-        {{DbName, _}, [{DbName, EndSeq, _}]} ->
+resume_scan(DbName, #state{pids = #{} = Pids, tid = Ets} = State) ->
+    case ets:lookup(Ets, DbName) of
+        [{DbName, EndSeq, _, undefined}] ->
+            % No existing change feed running. Found existing checkpoint.
+            % Start a new change reader from last checkpoint.
+            Pid = start_changes_reader(DbName, EndSeq),
+            true = ets:insert(Ets, {DbName, EndSeq, false, Pid}),
+            State#state{pids = Pids#{Pid => DbName}};
+        [{DbName, EndSeq, _, Pid}] ->
             % Found existing change feed and entry in ETS
             % Flag a need to rescan from last ETS checkpoint
-            true = ets:insert(Ets, {DbName, EndSeq, true}),
+            true = ets:insert(Ets, {DbName, EndSeq, true, Pid}),
             State;
-        {false, []} ->
-            % No existing change feed running. No entry in ETS.
-            % This is first time seeing this db shard.
-            % Notify user with a found callback. Insert checkpoint
-            % entry in ETS to start from 0. And start a change feed.
-            true = ets:insert(Ets, {DbName, 0, false}),
+        [] ->
+            % No entry in ETS. This is first time seeing this db shard. Notify
+            % user with a found callback. Insert checkpoint entry in ETS to
+            % start from 0. And start a change feed.
+            Pid = start_changes_reader(DbName, 0),
+            true = ets:insert(Ets, {DbName, 0, false, Pid}),
             Mod = State#state.mod,
             Ctx = Mod:db_found(DbName, State#state.ctx),
-            Pid = start_changes_reader(DbName, 0),
-            State#state{ctx = Ctx, pids = [{DbName, Pid} | Pids]};
-        {false, [{DbName, EndSeq, _}]} ->
-            % No existing change feed running. Found existing checkpoint.
-            % Start a new change reader from last checkpoint.
-            true = ets:insert(Ets, {DbName, EndSeq, false}),
-            Pid = start_changes_reader(DbName, EndSeq),
-            State#state{pids = [{DbName, Pid} | Pids]}
+            State#state{ctx = Ctx, pids = Pids#{Pid => DbName}}
     end.
 
 start_changes_reader(DbName, Since) ->
@@ -237,9 +233,7 @@ changes_reader_cb(_, _, Acc) ->
 
 scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
     ok = scan_local_db(Server, DbSuffix),
-    {ok, Db} = mem3_util:ensure_exists(
-        config:get("mem3", "shards_db", "_dbs")
-    ),
+    {ok, Db} = mem3_util:ensure_exists(shards_db()),
     ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db),
     ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
     couch_db:close(Db).
@@ -251,7 +245,7 @@ scan_changes_cb({change, {Change}, _}, _, {_Server, 
DbSuffix, _Count} = Acc) ->
             Acc;
         _Else ->
             NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName),
-            case {NameMatch, couch_replicator_utils:is_deleted(Change)} of
+            case {NameMatch, is_deleted(Change)} of
                 {false, _} ->
                     Acc;
                 {true, true} ->
@@ -264,6 +258,12 @@ scan_changes_cb({change, {Change}, _}, _, {_Server, 
DbSuffix, _Count} = Acc) ->
 scan_changes_cb(_, _, Acc) ->
     Acc.
 
+is_deleted(Change) ->
+    couch_util:get_value(<<"deleted">>, Change, false).
+
+shards_db() ->
+    config:get("mem3", "shards_db", "_dbs").
+
 local_shards(DbName) ->
     try
         [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
@@ -288,12 +288,9 @@ jitter(N) ->
     couch_rand:uniform(Range).
 
 scan_local_db(Server, DbSuffix) when is_pid(Server) ->
-    case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
-        {ok, Db} ->
-            gen_server:cast(Server, {resume_scan, DbSuffix}),
-            ok = couch_db:close(Db);
-        _Error ->
-            ok
+    case couch_server:exists(DbSuffix) of
+        true -> gen_server:cast(Server, {resume_scan, DbSuffix});
+        false -> ok
     end.
 
 is_design_doc({Change}) ->
@@ -311,7 +308,6 @@ is_design_doc_id(_) ->
 
 -ifdef(TEST).
 
--include_lib("eunit/include/eunit.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
 
 -define(MOD, multidb_test_module).
@@ -328,32 +324,33 @@ couch_multidb_changes_test_() ->
             fun setup/0,
             fun teardown/1,
             [
-                t_handle_call_change(),
-                t_handle_call_change_filter_design_docs(),
-                t_handle_call_checkpoint_new(),
-                t_handle_call_checkpoint_existing(),
-                t_handle_info_created(),
-                t_handle_info_deleted(),
-                t_handle_info_updated(),
-                t_handle_info_other_event(),
-                t_handle_info_created_other_db(),
-                t_handle_info_scanner_exit_normal(),
-                t_handle_info_scanner_crashed(),
-                t_handle_info_event_server_exited(),
-                t_handle_info_unknown_pid_exited(),
-                t_handle_info_change_feed_exited(),
-                t_handle_info_change_feed_exited_and_need_rescan(),
-                t_spawn_changes_reader(),
-                t_changes_reader_cb_change(),
-                t_changes_reader_cb_stop(),
-                t_changes_reader_cb_other(),
-                t_handle_call_resume_scan_no_chfeed_no_ets_entry(),
-                t_handle_call_resume_scan_chfeed_no_ets_entry(),
-                t_handle_call_resume_scan_chfeed_ets_entry(),
-                t_handle_call_resume_scan_no_chfeed_ets_entry(),
-                t_start_link(),
-                t_start_link_no_ddocs(),
-                t_misc_gen_server_callbacks()
+                ?TDEF_FE(t_handle_call_change),
+                ?TDEF_FE(t_handle_call_change_filter_design_docs),
+                ?TDEF_FE(t_handle_call_checkpoint_new),
+                ?TDEF_FE(t_handle_call_checkpoint_existing),
+                ?TDEF_FE(t_handle_call_checkpoint_stale_changes_pid),
+                ?TDEF_FE(t_handle_info_created),
+                ?TDEF_FE(t_handle_info_deleted),
+                ?TDEF_FE(t_handle_info_updated),
+                ?TDEF_FE(t_handle_info_other_event),
+                ?TDEF_FE(t_handle_info_created_other_db),
+                ?TDEF_FE(t_handle_info_scanner_exit_normal),
+                ?TDEF_FE(t_handle_info_scanner_crashed),
+                ?TDEF_FE(t_handle_info_event_server_exited),
+                ?TDEF_FE(t_handle_info_unknown_pid_exited),
+                ?TDEF_FE(t_handle_info_change_feed_exited),
+                ?TDEF_FE(t_handle_info_change_feed_exited_and_need_rescan),
+                ?TDEF_FE(t_spawn_changes_reader),
+                ?TDEF_FE(t_changes_reader_cb_change),
+                ?TDEF_FE(t_changes_reader_cb_stop),
+                ?TDEF_FE(t_changes_reader_cb_other),
+                ?TDEF_FE(t_handle_call_resume_scan_no_chfeed_no_ets_entry),
+                ?TDEF_FE(t_handle_call_resume_scan_chfeed_no_ets_entry),
+                ?TDEF_FE(t_handle_call_resume_scan_chfeed_ets_entry),
+                ?TDEF_FE(t_handle_call_resume_scan_no_chfeed_ets_entry),
+                ?TDEF_FE(t_start_link),
+                ?TDEF_FE(t_start_link_no_ddocs),
+                ?TDEF_FE(t_misc_gen_server_callbacks)
             ]
         }
     }.
@@ -362,7 +359,7 @@ setup_all() ->
     mock_logs(),
     mock_callback_mod(),
     meck:expect(couch_event, register_all, 1, ok),
-    meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+    test_util:start_applications([config]),
     meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
     ChangesFun = meck:val(fun(_) -> ok end),
     meck:expect(couch_changes, handle_db_changes, 3, ChangesFun),
@@ -387,6 +384,7 @@ setup_all() ->
     EvtPid.
 
 teardown_all(EvtPid) ->
+    test_util:stop_applications([config]),
     unlink(EvtPid),
     exit(EvtPid, kill),
     meck:unload().
@@ -403,322 +401,285 @@ setup() ->
 teardown(_) ->
     ok.
 
-t_handle_call_change() ->
-    ?_test(begin
-        State = mock_state(),
-        Change = change_row(<<"blah">>),
-        handle_call_ok({change, ?DBNAME, Change}, State),
-        ?assert(meck:validate(?MOD)),
-        ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
-    end).
-
-t_handle_call_change_filter_design_docs() ->
-    ?_test(begin
-        State0 = mock_state(),
-        State = State0#state{skip_ddocs = true},
-        Change = change_row(<<"_design/blah">>),
-        handle_call_ok({change, ?DBNAME, Change}, State),
-        ?assert(meck:validate(?MOD)),
-        ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
-    end).
-
-t_handle_call_checkpoint_new() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        State = mock_state(Tid),
-        handle_call_ok({checkpoint, ?DBNAME, 1}, State),
-        ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
-        ets:delete(Tid)
-    end).
-
-t_handle_call_checkpoint_existing() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        State = mock_state(Tid),
-        true = ets:insert(Tid, {?DBNAME, 1, true}),
-        handle_call_ok({checkpoint, ?DBNAME, 2}, State),
-        ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
-        ets:delete(Tid)
-    end).
-
-t_handle_info_created() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        State = mock_state(Tid),
-        handle_info_check({'$couch_event', ?DBNAME, created}, State),
-        ?assert(meck:validate(?MOD)),
-        ?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
-    end).
-
-t_handle_info_deleted() ->
-    ?_test(begin
-        State = mock_state(),
-        handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
-        ?assert(meck:validate(?MOD)),
-        ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
-    end).
-
-t_handle_info_updated() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        State = mock_state(Tid),
-        handle_info_check({'$couch_event', ?DBNAME, updated}, State),
-        ?assert(meck:validate(?MOD)),
-        ?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
-    end).
-
-t_handle_info_other_event() ->
-    ?_test(begin
-        State = mock_state(),
-        handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
-        ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
-        ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
-        ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse]))
-    end).
-
-t_handle_info_created_other_db() ->
-    ?_test(begin
-        State = mock_state(),
-        handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
-        ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
-    end).
-
-t_handle_info_scanner_exit_normal() ->
-    ?_test(begin
-        Res = handle_info({'EXIT', spid, normal}, mock_state()),
-        ?assertMatch({noreply, _}, Res),
-        {noreply, RState} = Res,
-        ?assertEqual(nil, RState#state.scanner)
-    end).
-
-t_handle_info_scanner_crashed() ->
-    ?_test(begin
-        Res = handle_info({'EXIT', spid, oops}, mock_state()),
-        ?assertMatch({stop, {scanner_died, oops}, _State}, Res)
-    end).
-
-t_handle_info_event_server_exited() ->
-    ?_test(begin
-        Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
-        ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
-    end).
-
-t_handle_info_unknown_pid_exited() ->
-    ?_test(begin
-        State0 = mock_state(),
-        Res0 = handle_info({'EXIT', somepid, normal}, State0),
-        ?assertMatch({noreply, State0}, Res0),
-        State1 = mock_state(),
-        Res1 = handle_info({'EXIT', somepid, oops}, State1),
-        ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
-    end).
-
-t_handle_info_change_feed_exited() ->
-    ?_test(begin
-        Tid0 = mock_ets(),
-        State0 = mock_state(Tid0, cpid),
-        Res0 = handle_info({'EXIT', cpid, normal}, State0),
-        ?assertMatch({noreply, _}, Res0),
-        {noreply, RState0} = Res0,
-        ?assertEqual([], RState0#state.pids),
-        ets:delete(Tid0),
-        Tid1 = mock_ets(),
-        State1 = mock_state(Tid1, cpid),
-        Res1 = handle_info({'EXIT', cpid, oops}, State1),
-        ?assertMatch({noreply, _}, Res1),
-        {noreply, RState1} = Res1,
-        ?assertEqual([], RState1#state.pids),
-        ets:delete(Tid1)
-    end).
-
-t_handle_info_change_feed_exited_and_need_rescan() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        true = ets:insert(Tid, {?DBNAME, 1, true}),
-        State = mock_state(Tid, cpid),
-        Res = handle_info({'EXIT', cpid, normal}, State),
-        ?assertMatch({noreply, _}, Res),
-        {noreply, RState} = Res,
-        % rescan flag should have been reset to false
-        ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
-        % a mock change feed process should be running
-        [{?DBNAME, Pid}] = RState#state.pids,
-        ?assert(is_pid(Pid)),
-        ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
-        ?assertEqual({self(), ?DBNAME}, ChArgs),
-        ets:delete(Tid)
-    end).
-
-t_spawn_changes_reader() ->
-    ?_test(begin
-        Pid = start_changes_reader(?DBNAME, 3),
-        ?assert(erlang:is_process_alive(Pid)),
-        ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
-        ?assertEqual({self(), ?DBNAME}, ChArgs),
-        ?assert(meck:validate(couch_db)),
-        ?assert(meck:validate(couch_changes)),
-        ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
-        ?assert(
-            meck:called(couch_changes, handle_db_changes, [
-                #changes_args{
-                    include_docs = true,
-                    since = 3,
-                    feed = "normal",
-                    timeout = infinity
-                },
-                {json_req, null},
-                db
-            ])
-        )
-    end).
-
-t_changes_reader_cb_change() ->
-    ?_test(begin
-        {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
-        Change = change_row(<<"blah">>),
-        ChArg = {change, Change, ignore},
-        {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
-        ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
-        unlink(Pid),
-        exit(Pid, kill)
-    end).
-
-t_changes_reader_cb_stop() ->
-    ?_test(begin
-        {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
-        ChArg = {stop, 11},
-        {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
-        % We checkpoint on stop, check if checkpointed at correct sequence
-        #state{tid = Tid} = sys:get_state(Pid),
-        ?assertEqual([{?DBNAME, 11, false}], ets:tab2list(Tid)),
-        unlink(Pid),
-        exit(Pid, kill)
-    end).
-
-t_changes_reader_cb_other() ->
-    ?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
-
-t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        State = mock_state(Tid),
-        RState = resume_scan(?DBNAME, State),
-        % Check if inserted checkpoint entry in ets starting at 0
-        ?assertEqual([{?DBNAME, 0, false}], ets:tab2list(Tid)),
-        % Check if called db_found callback
-        ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
-        % Check if started a change reader
-        [{?DBNAME, Pid}] = RState#state.pids,
-        ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
-        ?assertEqual({self(), ?DBNAME}, ChArgs),
-        ?assert(
-            meck:called(couch_changes, handle_db_changes, [
-                #changes_args{
-                    include_docs = true,
-                    since = 0,
-                    feed = "normal",
-                    timeout = infinity
-                },
-                {json_req, null},
-                db
-            ])
-        ),
-        ets:delete(Tid)
-    end).
-
-t_handle_call_resume_scan_chfeed_no_ets_entry() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        Pid = start_changes_reader(?DBNAME, 0),
-        State = mock_state(Tid, Pid),
-        resume_scan(?DBNAME, State),
-        % Check ets checkpoint is set to 0 and rescan = true
-        ?assertEqual([{?DBNAME, 0, true}], ets:tab2list(Tid)),
-        ets:delete(Tid),
-        kill_mock_changes_reader_and_get_its_args(Pid)
-    end).
-
-t_handle_call_resume_scan_chfeed_ets_entry() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        true = ets:insert(Tid, [{?DBNAME, 2, false}]),
-        Pid = start_changes_reader(?DBNAME, 1),
-        State = mock_state(Tid, Pid),
-        resume_scan(?DBNAME, State),
-        % Check ets checkpoint is set to same endseq but rescan = true
-        ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
-        ets:delete(Tid),
-        kill_mock_changes_reader_and_get_its_args(Pid)
-    end).
-
-t_handle_call_resume_scan_no_chfeed_ets_entry() ->
-    ?_test(begin
-        Tid = mock_ets(),
-        true = ets:insert(Tid, [{?DBNAME, 1, true}]),
-        State = mock_state(Tid),
-        RState = resume_scan(?DBNAME, State),
-        % Check if reset rescan to false but kept same endseq
-        ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
-        % Check if started a change reader
-        [{?DBNAME, Pid}] = RState#state.pids,
-        ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
-        ?assertEqual({self(), ?DBNAME}, ChArgs),
-        ?assert(
-            meck:called(couch_changes, handle_db_changes, [
-                #changes_args{
-                    include_docs = true,
-                    since = 1,
-                    feed = "normal",
-                    timeout = infinity
-                },
-                {json_req, null},
-                db
-            ])
-        ),
-        ets:delete(Tid)
-    end).
-
-t_start_link() ->
-    ?_test(begin
-        {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
-        ?assert(is_pid(Pid)),
-        ?assertMatch(
-            #state{
-                mod = ?MOD,
-                suffix = ?SUFFIX,
-                ctx = nil,
-                pids = [],
-                skip_ddocs = false
+t_handle_call_change(_) ->
+    State = mock_state(),
+    Change = change_row(<<"blah">>),
+    handle_call_ok({change, ?DBNAME, Change}, State),
+    ?assert(meck:validate(?MOD)),
+    ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])).
+
+t_handle_call_change_filter_design_docs(_) ->
+    State0 = mock_state(),
+    State = State0#state{skip_ddocs = true},
+    Change = change_row(<<"_design/blah">>),
+    handle_call_ok({change, ?DBNAME, Change}, State),
+    ?assert(meck:validate(?MOD)),
+    ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig])).
+
+t_handle_call_checkpoint_new(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid, cpid),
+    handle_call_ok({checkpoint, ?DBNAME, 1}, cpid, State),
+    ?assertEqual([{?DBNAME, 1, false, cpid}], ets:tab2list(Tid)),
+    ets:delete(Tid).
+
+t_handle_call_checkpoint_existing(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid, cpid),
+    handle_call_ok({checkpoint, ?DBNAME, 2}, cpid, State),
+    ?assertEqual([{?DBNAME, 2, false, cpid}], ets:tab2list(Tid)),
+    ets:delete(Tid).
+
+t_handle_call_checkpoint_stale_changes_pid(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid, cpid),
+    handle_call_ok({checkpoint, ?DBNAME, 42}, other, State),
+    ?assertEqual([{?DBNAME, 0, false, cpid}], ets:tab2list(Tid)),
+    ets:delete(Tid).
+
+t_handle_info_created(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid),
+    handle_info_check({'$couch_event', ?DBNAME, created}, State),
+    ?assert(meck:validate(?MOD)),
+    ?assert(meck:called(?MOD, db_created, [?DBNAME, zig])).
+
+t_handle_info_deleted(_) ->
+    State = mock_state(),
+    handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
+    ?assert(meck:validate(?MOD)),
+    ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig])).
+
+t_handle_info_updated(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid),
+    handle_info_check({'$couch_event', ?DBNAME, updated}, State),
+    ?assert(meck:validate(?MOD)),
+    ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])).
+
+t_handle_info_other_event(_) ->
+    State = mock_state(),
+    handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
+    ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
+    ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
+    ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse])).
+
+t_handle_info_created_other_db(_) ->
+    State = mock_state(),
+    handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
+    ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig])).
+
+t_handle_info_scanner_exit_normal(_) ->
+    Res = handle_info({'EXIT', spid, normal}, mock_state()),
+    ?assertMatch({noreply, _}, Res),
+    {noreply, RState} = Res,
+    ?assertEqual(nil, RState#state.scanner).
+
+t_handle_info_scanner_crashed(_) ->
+    Res = handle_info({'EXIT', spid, oops}, mock_state()),
+    ?assertMatch({stop, {scanner_died, oops}, _State}, Res).
+
+t_handle_info_event_server_exited(_) ->
+    Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
+    ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res).
+
+t_handle_info_unknown_pid_exited(_) ->
+    State0 = mock_state(),
+    Res0 = handle_info({'EXIT', somepid, normal}, State0),
+    ?assertMatch({stop, {unexpected_exit, somepid, normal}, State0}, Res0),
+    State1 = mock_state(),
+    Res1 = handle_info({'EXIT', somepid, oops}, State1),
+    ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1).
+
+t_handle_info_change_feed_exited(_) ->
+    Tid0 = mock_ets(),
+    State0 = mock_state(Tid0, cpid),
+    Res0 = handle_info({'EXIT', cpid, normal}, State0),
+    ?assertMatch({noreply, _}, Res0),
+    {noreply, RState0} = Res0,
+    ?assertEqual(#{}, RState0#state.pids),
+    ets:delete(Tid0),
+    Tid1 = mock_ets(),
+    State1 = mock_state(Tid1, cpid),
+    Res1 = handle_info({'EXIT', cpid, oops}, State1),
+    ?assertMatch({noreply, _}, Res1),
+    {noreply, RState1} = Res1,
+    ?assertEqual(#{}, RState1#state.pids),
+    ets:delete(Tid1).
+
+t_handle_info_change_feed_exited_and_need_rescan(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid, cpid),
+    true = ets:insert(Tid, {?DBNAME, 1, true, cpid}),
+    Res = handle_info({'EXIT', cpid, normal}, State),
+    ?assertMatch({noreply, _}, Res),
+    {noreply, RState} = Res,
+    % a mock change feed process should be running
+    [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids),
+    ?assert(is_pid(Pid)),
+    % rescan flag should have been reset to false
+    ?assertEqual([{?DBNAME, 1, false, Pid}], ets:tab2list(Tid)),
+    ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+    ?assertEqual({self(), ?DBNAME}, ChArgs),
+    ets:delete(Tid).
+
+t_spawn_changes_reader(_) ->
+    Pid = start_changes_reader(?DBNAME, 3),
+    ?assert(erlang:is_process_alive(Pid)),
+    ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+    ?assertEqual({self(), ?DBNAME}, ChArgs),
+    ?assert(meck:validate(couch_db)),
+    ?assert(meck:validate(couch_changes)),
+    ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
+    ?assert(
+        meck:called(couch_changes, handle_db_changes, [
+            #changes_args{
+                include_docs = true,
+                since = 3,
+                feed = "normal",
+                timeout = infinity
             },
-            sys:get_state(Pid)
-        ),
-        unlink(Pid),
-        exit(Pid, kill),
-        ?assert(meck:called(couch_event, register_all, [Pid]))
-    end).
-
-t_start_link_no_ddocs() ->
-    ?_test(begin
-        {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
-        ?assert(is_pid(Pid)),
-        ?assertMatch(
-            #state{
-                mod = ?MOD,
-                suffix = ?SUFFIX,
-                ctx = nil,
-                pids = [],
-                skip_ddocs = true
+            {json_req, null},
+            db
+        ])
+    ).
+
+t_changes_reader_cb_change(_) ->
+    {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
+    Change = change_row(<<"blah">>),
+    ChArg = {change, Change, ignore},
+    {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
+    ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
+    unlink(Pid),
+    exit(Pid, kill).
+
+t_changes_reader_cb_stop(_) ->
+    {ok, ServerPid} = start_link(?SUFFIX, ?MOD, zig, []),
+    #state{tid = Tid} = sys:get_state(ServerPid),
+    ChPid = self(),
+    ets:insert(Tid, {?DBNAME, 1, false, ChPid}),
+    sys:replace_state(ServerPid, fun(#state{} = OldSt) ->
+        OldSt#state{pids = #{ChPid => ?DBNAME}}
+    end),
+    ChArg = {stop, 11},
+    {ServerPid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {ServerPid, 
?DBNAME}),
+    % We checkpoint on stop, check if checkpointed at correct sequence
+    #state{tid = Tid, pids = Pids} = sys:get_state(ServerPid),
+    ?assertMatch(#{ChPid := ?DBNAME}, Pids),
+    ?assertEqual([{?DBNAME, 11, false, ChPid}], ets:tab2list(Tid)),
+    unlink(ServerPid),
+    exit(ServerPid, kill).
+
+t_changes_reader_cb_other(_) ->
+    ?assertEqual(acc, changes_reader_cb(other, chtype, acc)).
+
+t_handle_call_resume_scan_no_chfeed_no_ets_entry(_) ->
+    Tid = mock_ets(),
+    State = mock_state(Tid),
+    RState = resume_scan(?DBNAME, State),
+    % Check if called db_found callback
+    ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
+    % Check if started a change reader
+    [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids),
+    % Check if inserted checkpoint entry in ets starting at 0
+    ?assertEqual([{?DBNAME, 0, false, Pid}], ets:tab2list(Tid)),
+    ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+    ?assertEqual({self(), ?DBNAME}, ChArgs),
+    ?assert(
+        meck:called(couch_changes, handle_db_changes, [
+            #changes_args{
+                include_docs = true,
+                since = 0,
+                feed = "normal",
+                timeout = infinity
+            },
+            {json_req, null},
+            db
+        ])
+    ),
+    ets:delete(Tid).
+
+t_handle_call_resume_scan_chfeed_no_ets_entry(_) ->
+    Tid = mock_ets(),
+    Pid = start_changes_reader(?DBNAME, 0),
+    State = mock_state(Tid, Pid),
+    resume_scan(?DBNAME, State),
+    % Check ets checkpoint is set to 0 and rescan = true
+    ?assertEqual([{?DBNAME, 0, true, Pid}], ets:tab2list(Tid)),
+    ets:delete(Tid),
+    kill_mock_changes_reader_and_get_its_args(Pid).
+
+t_handle_call_resume_scan_chfeed_ets_entry(_) ->
+    Tid = mock_ets(),
+    Pid = start_changes_reader(?DBNAME, 1),
+    State = mock_state(Tid, Pid),
+    true = ets:insert(Tid, [{?DBNAME, 2, false, Pid}]),
+    resume_scan(?DBNAME, State),
+    % Check ets checkpoint is set to same endseq but rescan = true
+    ?assertEqual([{?DBNAME, 2, true, Pid}], ets:tab2list(Tid)),
+    ets:delete(Tid),
+    kill_mock_changes_reader_and_get_its_args(Pid).
+
+t_handle_call_resume_scan_no_chfeed_ets_entry(_) ->
+    Tid = mock_ets(),
+    true = ets:insert(Tid, [{?DBNAME, 1, true, undefined}]),
+    State = mock_state(Tid),
+    RState = resume_scan(?DBNAME, State),
+    % Check if started a change reader
+    [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids),
+    % Check if reset rescan to false but kept same endseq
+    ?assertEqual([{?DBNAME, 1, false, Pid}], ets:tab2list(Tid)),
+    ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+    ?assertEqual({self(), ?DBNAME}, ChArgs),
+    ?assert(
+        meck:called(couch_changes, handle_db_changes, [
+            #changes_args{
+                include_docs = true,
+                since = 1,
+                feed = "normal",
+                timeout = infinity
             },
-            sys:get_state(Pid)
-        ),
-        unlink(Pid),
-        exit(Pid, kill)
-    end).
+            {json_req, null},
+            db
+        ])
+    ),
+    ets:delete(Tid).
+
+t_start_link(_) ->
+    {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
+    ?assert(is_pid(Pid)),
+    ?assertMatch(
+        #state{
+            mod = ?MOD,
+            suffix = ?SUFFIX,
+            ctx = nil,
+            pids = #{},
+            skip_ddocs = false
+        },
+        sys:get_state(Pid)
+    ),
+    unlink(Pid),
+    exit(Pid, kill),
+    ?assert(meck:called(couch_event, register_all, [Pid])).
+
+t_start_link_no_ddocs(_) ->
+    {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
+    ?assert(is_pid(Pid)),
+    ?assertMatch(
+        #state{
+            mod = ?MOD,
+            suffix = ?SUFFIX,
+            ctx = nil,
+            pids = #{},
+            skip_ddocs = true
+        },
+        sys:get_state(Pid)
+    ),
+    unlink(Pid),
+    exit(Pid, kill).
 
-t_misc_gen_server_callbacks() ->
-    ?_test(begin
-        ?assertEqual(ok, terminate(reason, state))
-    end).
+t_misc_gen_server_callbacks(_) ->
+    ?assertEqual(ok, terminate(reason, state)).
 
 scan_dbs_test_() ->
     {
@@ -734,45 +695,60 @@ scan_dbs_test_() ->
             fabric:delete_db(GlobalDb, [?CTX]),
             test_util:stop_couch(Ctx)
         end,
-        {with, [
-            fun t_find_shard/1,
-            fun t_shard_not_found/1,
-            fun t_pass_local/1,
-            fun t_fail_local/1
-        ]}
+        with([
+            ?TDEF(t_find_shard),
+            ?TDEF(t_shard_not_found),
+            ?TDEF(t_pass_local),
+            ?TDEF(t_fail_local),
+            ?TDEF(t_scan_all_dbs)
+        ])
     }.
 
 t_find_shard({_, DbName, _}) ->
-    ?_test(begin
-        ?assertEqual(2, length(local_shards(DbName)))
-    end).
+    ?assertEqual(2, length(local_shards(DbName))).
 
 t_shard_not_found(_) ->
-    ?_test(begin
-        ?assertEqual([], local_shards(?tempdb()))
-    end).
+    ?assertEqual([], local_shards(?tempdb())).
 
 t_pass_local({_, _, LocalDb}) ->
-    ?_test(begin
-        scan_local_db(self(), LocalDb),
-        receive
-            {'$gen_cast', Msg} ->
-                ?assertEqual(Msg, {resume_scan, LocalDb})
-        after 0 ->
-            ?assert(false)
-        end
-    end).
+    scan_local_db(self(), LocalDb),
+    receive
+        {'$gen_cast', Msg} ->
+            ?assertEqual(Msg, {resume_scan, LocalDb})
+    after 0 ->
+        ?assert(false)
+    end.
 
 t_fail_local({_, _, LocalDb}) ->
-    ?_test(begin
-        scan_local_db(self(), <<"some_other_db">>),
-        receive
-            {'$gen_cast', Msg} ->
-                ?assertNotEqual(Msg, {resume_scan, LocalDb})
-        after 0 ->
-            ?assert(true)
-        end
-    end).
+    scan_local_db(self(), <<"some_other_db">>),
+    receive
+        {'$gen_cast', Msg} ->
+            ?assertNotEqual(Msg, {resume_scan, LocalDb})
+    after 0 ->
+        ?assert(true)
+    end.
+
+t_scan_all_dbs({_, GlobalDb, _}) ->
+    scan_all_dbs(self(), GlobalDb),
+    ?assertMatch(
+        [
+            {'$gen_cast', {resume_scan, <<"shards/00000000-7fffffff/", 
_/binary>>}},
+            {'$gen_cast', {resume_scan, <<"shards/80000000-ffffffff/", 
_/binary>>}}
+        ],
+        lists:sort(flush([]))
+    ).
+
+flush(Acc) ->
+    receive
+        Msg ->
+            NewMsg = [Msg | Acc],
+            case length(NewMsg) >= 2 of
+                true -> NewMsg;
+                false -> flush(NewMsg)
+            end
+    after 1000 ->
+        Acc
+    end.
 
 % Test helper functions
 
@@ -826,7 +802,7 @@ mock_state() ->
         suffix = ?SUFFIX,
         event_server = esref,
         scanner = spid,
-        pids = []
+        pids = #{}
     }.
 
 mock_state(Ets) ->
@@ -835,7 +811,8 @@ mock_state(Ets) ->
 
 mock_state(Ets, Pid) ->
     State = mock_state(Ets),
-    State#state{pids = [{?DBNAME, Pid}]}.
+    ets:insert(State#state.tid, {?DBNAME, 0, false, Pid}),
+    State#state{pids = #{Pid => ?DBNAME}}.
 
 change_row(Id) when is_binary(Id) ->
     {[
@@ -846,7 +823,11 @@ change_row(Id) when is_binary(Id) ->
     ]}.
 
 handle_call_ok(Msg, State) ->
-    ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
+    handle_call_ok(Msg, from, State).
+
+handle_call_ok(Msg, FromPid, State) ->
+    FromTag = make_ref(),
+    ?assertMatch({reply, ok, _}, handle_call(Msg, {FromPid, FromTag}, State)).
 
 handle_info_check(Msg, State) ->
     ?assertMatch({noreply, _}, handle_info(Msg, State)).
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
index 192d113c6..76450a692 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
@@ -101,8 +101,8 @@ t_replicator_doc_state_fields_test_() ->
         fun setup_prefixed_replicator_db/0,
         fun teardown/1,
         with([
-            ?TDEF(t_doc_fields_are_updated, 10),
-            ?TDEF(t_doc_fields_are_ignored, 10)
+            ?TDEF(t_doc_fields_are_updated, 15),
+            ?TDEF(t_doc_fields_are_ignored, 15)
         ])
     }.
 
@@ -112,8 +112,8 @@ t_replicator_doc_state_fields_update_docs_true_test_() ->
         fun setup_prefixed_replicator_db_with_update_docs_true/0,
         fun teardown/1,
         with([
-            ?TDEF(t_doc_fields_are_updated, 10),
-            ?TDEF(t_doc_fields_are_ignored, 10)
+            ?TDEF(t_doc_fields_are_updated, 15),
+            ?TDEF(t_doc_fields_are_ignored, 15)
         ])
     }.
 
@@ -135,7 +135,7 @@ t_scheduler_docs_total_rows({_Ctx, {RepDb, Source, 
Target}}) ->
                 {_, #{}} -> wait
             end
         end,
-        10000,
+        14000,
         1000
     ),
     Docs = maps:get(<<"docs">>, Body),
@@ -183,7 +183,7 @@ t_doc_fields_are_updated({_Ctx, {RepDb, Source, Target}}) ->
                 {_, #{}} -> wait
             end
         end,
-        10000,
+        14000,
         1000
     ),
     ?assertMatch(
@@ -225,7 +225,7 @@ t_doc_fields_are_ignored({_Ctx, {RepDb, Source, Target}}) ->
                 {_, #{}} -> wait
             end
         end,
-        10000,
+        14000,
         1000
     ),
     ?assertMatch(


Reply via email to