This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new 641b39373 Optimize and clean up couch_multidb_changes
641b39373 is described below
commit 641b39373d540fbb8eb4c31700ff88d2503a7b98
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Dec 12 12:11:45 2023 -0500
Optimize and clean up couch_multidb_changes
couch_multidb_changes is in charge of monitoring changes to multiple
databases.
It is what drives the couch_replicator application when users update
replication docs. It starts off by scanning all the local db shards and
launches changes feeds for each of them. After it processes each changes
feed,
it checkpoints where it stops. As the shards are updated, it starts change
feeds for those updated shards and checkpoints again. The checkpoints are
kept
in an ets table and the main logic which decides when to start a changes
feed
for a particular shard is in the resume_scan/2 function.
This change makes a few small optimizations:
* Use a map to track Pid -> DbName mappings. This avoids using a O(N)
operation for looking up exiting change feed processes. So `pids` is
switched to be a map of `#{Pid => DbName}` and the ets table has a 4th
tuple
member to keep track of `dbname -> pid` mappings.
* Previously, when the plain "suffix" dbname (just <<"_replicator">>) was
found, we tried to open and close it to see if it exists. Change to use
`couch_server:exists/1` instead.
The are also a few clean-ups:
* Update tests to use ?TDEF_FE/?TDEF macros. This shortens them a bit and
saves one indentation level.
* Increase test coverage from 86% to 96%. To help create a few more test
scenarios switched to using a public ets table instead of a private one.
---
src/couch/src/couch_multidb_changes.erl | 835 ++++++++++-----------
.../couch_replicator_scheduler_docs_tests.erl | 14 +-
2 files changed, 415 insertions(+), 434 deletions(-)
diff --git a/src/couch/src/couch_multidb_changes.erl
b/src/couch/src/couch_multidb_changes.erl
index a9b4c4fb6..ecb7e1bfe 100644
--- a/src/couch/src/couch_multidb_changes.erl
+++ b/src/couch/src/couch_multidb_changes.erl
@@ -46,7 +46,7 @@
suffix :: binary(),
event_server :: reference(),
scanner :: nil | pid(),
- pids :: [{binary(), pid()}],
+ pids :: #{},
skip_ddocs :: boolean()
}).
@@ -85,36 +85,34 @@ init([DbSuffix, Module, Context, Opts]) ->
process_flag(trap_exit, true),
Server = self(),
{ok, #state{
- tid = ets:new(?MODULE, [set, protected]),
+ tid = ets:new(?MODULE, [set, public]),
mod = Module,
ctx = Context,
suffix = DbSuffix,
event_server = register_with_event_server(Server),
scanner = spawn_link(fun() -> scan_all_dbs(Server, DbSuffix) end),
- pids = [],
+ pids = #{},
skip_ddocs = proplists:is_defined(skip_ddocs, Opts)
}}.
terminate(_Reason, _State) ->
ok.
-handle_call(
- {change, DbName, Change},
- _From,
- #state{skip_ddocs = SkipDDocs, mod = Mod, ctx = Ctx} = State
-) ->
+handle_call({change, DbName, Change}, _From, #state{} = State) ->
+ #state{skip_ddocs = SkipDDocs, mod = Mod, ctx = Ctx} = State,
case {SkipDDocs, is_design_doc(Change)} of
{true, true} ->
{reply, ok, State};
{_, _} ->
{reply, ok, State#state{ctx = Mod:db_change(DbName, Change, Ctx)}}
end;
-handle_call({checkpoint, DbName, EndSeq}, _From, #state{tid = Ets} = State) ->
+handle_call({checkpoint, DbName, EndSeq}, {Pid, _Tag} = _From, #state{tid =
Ets} = State) ->
case ets:lookup(Ets, DbName) of
- [] ->
- true = ets:insert(Ets, {DbName, EndSeq, false});
- [{DbName, _OldSeq, Rescan}] ->
- true = ets:insert(Ets, {DbName, EndSeq, Rescan})
+ [{DbName, _OldSeq, Rescan, Pid}] ->
+ true = ets:insert(Ets, {DbName, EndSeq, Rescan, Pid});
+ _ ->
+ % Ignore stale checkpoints or checkpoints from unknown change feeds
+ ok
end,
{reply, ok, State}.
@@ -134,10 +132,10 @@ handle_info({'EXIT', From, normal}, #state{scanner =
From} = State) ->
{noreply, State#state{scanner = nil}};
handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) ->
{stop, {scanner_died, Reason}, State};
-handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) ->
+handle_info({'EXIT', From, Reason}, #state{pids = #{} = Pids} = State) ->
couch_log:debug("~p change feed exited ~p", [State#state.suffix, From]),
- case lists:keytake(From, 2, Pids) of
- {value, {DbName, From}, NewPids} ->
+ case maps:take(From, Pids) of
+ {DbName, NewPids} ->
if
Reason == normal ->
ok;
@@ -147,14 +145,18 @@ handle_info({'EXIT', From, Reason}, #state{pids = Pids} =
State) ->
end,
NewState = State#state{pids = NewPids},
case ets:lookup(State#state.tid, DbName) of
- [{DbName, _EndSeq, true}] ->
+ [{DbName, _EndSeq, true, From}] ->
+ % Match the From pid explicitly and then clear it
+ % The pid is at 4th position in the ets object
+ ets:update_element(NewState#state.tid, DbName, {4,
undefined}),
{noreply, resume_scan(DbName, NewState)};
+ [{DbName, _EndSeq, false, From}] ->
+ ets:update_element(NewState#state.tid, DbName, {4,
undefined}),
+ {noreply, NewState};
_ ->
{noreply, NewState}
end;
- false when Reason == normal ->
- {noreply, State};
- false ->
+ error ->
Fmt = "~s(~p) : Unknown pid ~w died :: ~w",
couch_log:error(Fmt, [?MODULE, State#state.suffix, From, Reason]),
{stop, {unexpected_exit, From, Reason}, State}
@@ -182,34 +184,28 @@ db_callback(_Other, _DbName, State) ->
State.
-spec resume_scan(binary(), #state{}) -> #state{}.
-resume_scan(DbName, #state{pids = Pids, tid = Ets} = State) ->
- case {lists:keyfind(DbName, 1, Pids), ets:lookup(Ets, DbName)} of
- {{DbName, _}, []} ->
- % Found existing change feed, but not entry in ETS
- % Flag a need to rescan from begining
- true = ets:insert(Ets, {DbName, 0, true}),
- State;
- {{DbName, _}, [{DbName, EndSeq, _}]} ->
+resume_scan(DbName, #state{pids = #{} = Pids, tid = Ets} = State) ->
+ case ets:lookup(Ets, DbName) of
+ [{DbName, EndSeq, _, undefined}] ->
+ % No existing change feed running. Found existing checkpoint.
+ % Start a new change reader from last checkpoint.
+ Pid = start_changes_reader(DbName, EndSeq),
+ true = ets:insert(Ets, {DbName, EndSeq, false, Pid}),
+ State#state{pids = Pids#{Pid => DbName}};
+ [{DbName, EndSeq, _, Pid}] ->
% Found existing change feed and entry in ETS
% Flag a need to rescan from last ETS checkpoint
- true = ets:insert(Ets, {DbName, EndSeq, true}),
+ true = ets:insert(Ets, {DbName, EndSeq, true, Pid}),
State;
- {false, []} ->
- % No existing change feed running. No entry in ETS.
- % This is first time seeing this db shard.
- % Notify user with a found callback. Insert checkpoint
- % entry in ETS to start from 0. And start a change feed.
- true = ets:insert(Ets, {DbName, 0, false}),
+ [] ->
+ % No entry in ETS. This is first time seeing this db shard. Notify
+ % user with a found callback. Insert checkpoint entry in ETS to
+ % start from 0. And start a change feed.
+ Pid = start_changes_reader(DbName, 0),
+ true = ets:insert(Ets, {DbName, 0, false, Pid}),
Mod = State#state.mod,
Ctx = Mod:db_found(DbName, State#state.ctx),
- Pid = start_changes_reader(DbName, 0),
- State#state{ctx = Ctx, pids = [{DbName, Pid} | Pids]};
- {false, [{DbName, EndSeq, _}]} ->
- % No existing change feed running. Found existing checkpoint.
- % Start a new change reader from last checkpoint.
- true = ets:insert(Ets, {DbName, EndSeq, false}),
- Pid = start_changes_reader(DbName, EndSeq),
- State#state{pids = [{DbName, Pid} | Pids]}
+ State#state{ctx = Ctx, pids = Pids#{Pid => DbName}}
end.
start_changes_reader(DbName, Since) ->
@@ -237,9 +233,7 @@ changes_reader_cb(_, _, Acc) ->
scan_all_dbs(Server, DbSuffix) when is_pid(Server) ->
ok = scan_local_db(Server, DbSuffix),
- {ok, Db} = mem3_util:ensure_exists(
- config:get("mem3", "shards_db", "_dbs")
- ),
+ {ok, Db} = mem3_util:ensure_exists(shards_db()),
ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db),
ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}),
couch_db:close(Db).
@@ -251,7 +245,7 @@ scan_changes_cb({change, {Change}, _}, _, {_Server,
DbSuffix, _Count} = Acc) ->
Acc;
_Else ->
NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName),
- case {NameMatch, couch_replicator_utils:is_deleted(Change)} of
+ case {NameMatch, is_deleted(Change)} of
{false, _} ->
Acc;
{true, true} ->
@@ -264,6 +258,12 @@ scan_changes_cb({change, {Change}, _}, _, {_Server,
DbSuffix, _Count} = Acc) ->
scan_changes_cb(_, _, Acc) ->
Acc.
+is_deleted(Change) ->
+ couch_util:get_value(<<"deleted">>, Change, false).
+
+shards_db() ->
+ config:get("mem3", "shards_db", "_dbs").
+
local_shards(DbName) ->
try
[ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
@@ -288,12 +288,9 @@ jitter(N) ->
couch_rand:uniform(Range).
scan_local_db(Server, DbSuffix) when is_pid(Server) ->
- case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of
- {ok, Db} ->
- gen_server:cast(Server, {resume_scan, DbSuffix}),
- ok = couch_db:close(Db);
- _Error ->
- ok
+ case couch_server:exists(DbSuffix) of
+ true -> gen_server:cast(Server, {resume_scan, DbSuffix});
+ false -> ok
end.
is_design_doc({Change}) ->
@@ -311,7 +308,6 @@ is_design_doc_id(_) ->
-ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-define(MOD, multidb_test_module).
@@ -328,32 +324,33 @@ couch_multidb_changes_test_() ->
fun setup/0,
fun teardown/1,
[
- t_handle_call_change(),
- t_handle_call_change_filter_design_docs(),
- t_handle_call_checkpoint_new(),
- t_handle_call_checkpoint_existing(),
- t_handle_info_created(),
- t_handle_info_deleted(),
- t_handle_info_updated(),
- t_handle_info_other_event(),
- t_handle_info_created_other_db(),
- t_handle_info_scanner_exit_normal(),
- t_handle_info_scanner_crashed(),
- t_handle_info_event_server_exited(),
- t_handle_info_unknown_pid_exited(),
- t_handle_info_change_feed_exited(),
- t_handle_info_change_feed_exited_and_need_rescan(),
- t_spawn_changes_reader(),
- t_changes_reader_cb_change(),
- t_changes_reader_cb_stop(),
- t_changes_reader_cb_other(),
- t_handle_call_resume_scan_no_chfeed_no_ets_entry(),
- t_handle_call_resume_scan_chfeed_no_ets_entry(),
- t_handle_call_resume_scan_chfeed_ets_entry(),
- t_handle_call_resume_scan_no_chfeed_ets_entry(),
- t_start_link(),
- t_start_link_no_ddocs(),
- t_misc_gen_server_callbacks()
+ ?TDEF_FE(t_handle_call_change),
+ ?TDEF_FE(t_handle_call_change_filter_design_docs),
+ ?TDEF_FE(t_handle_call_checkpoint_new),
+ ?TDEF_FE(t_handle_call_checkpoint_existing),
+ ?TDEF_FE(t_handle_call_checkpoint_stale_changes_pid),
+ ?TDEF_FE(t_handle_info_created),
+ ?TDEF_FE(t_handle_info_deleted),
+ ?TDEF_FE(t_handle_info_updated),
+ ?TDEF_FE(t_handle_info_other_event),
+ ?TDEF_FE(t_handle_info_created_other_db),
+ ?TDEF_FE(t_handle_info_scanner_exit_normal),
+ ?TDEF_FE(t_handle_info_scanner_crashed),
+ ?TDEF_FE(t_handle_info_event_server_exited),
+ ?TDEF_FE(t_handle_info_unknown_pid_exited),
+ ?TDEF_FE(t_handle_info_change_feed_exited),
+ ?TDEF_FE(t_handle_info_change_feed_exited_and_need_rescan),
+ ?TDEF_FE(t_spawn_changes_reader),
+ ?TDEF_FE(t_changes_reader_cb_change),
+ ?TDEF_FE(t_changes_reader_cb_stop),
+ ?TDEF_FE(t_changes_reader_cb_other),
+ ?TDEF_FE(t_handle_call_resume_scan_no_chfeed_no_ets_entry),
+ ?TDEF_FE(t_handle_call_resume_scan_chfeed_no_ets_entry),
+ ?TDEF_FE(t_handle_call_resume_scan_chfeed_ets_entry),
+ ?TDEF_FE(t_handle_call_resume_scan_no_chfeed_ets_entry),
+ ?TDEF_FE(t_start_link),
+ ?TDEF_FE(t_start_link_no_ddocs),
+ ?TDEF_FE(t_misc_gen_server_callbacks)
]
}
}.
@@ -362,7 +359,7 @@ setup_all() ->
mock_logs(),
mock_callback_mod(),
meck:expect(couch_event, register_all, 1, ok),
- meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"),
+ test_util:start_applications([config]),
meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}),
ChangesFun = meck:val(fun(_) -> ok end),
meck:expect(couch_changes, handle_db_changes, 3, ChangesFun),
@@ -387,6 +384,7 @@ setup_all() ->
EvtPid.
teardown_all(EvtPid) ->
+ test_util:stop_applications([config]),
unlink(EvtPid),
exit(EvtPid, kill),
meck:unload().
@@ -403,322 +401,285 @@ setup() ->
teardown(_) ->
ok.
-t_handle_call_change() ->
- ?_test(begin
- State = mock_state(),
- Change = change_row(<<"blah">>),
- handle_call_ok({change, ?DBNAME, Change}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
- end).
-
-t_handle_call_change_filter_design_docs() ->
- ?_test(begin
- State0 = mock_state(),
- State = State0#state{skip_ddocs = true},
- Change = change_row(<<"_design/blah">>),
- handle_call_ok({change, ?DBNAME, Change}, State),
- ?assert(meck:validate(?MOD)),
- ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig]))
- end).
-
-t_handle_call_checkpoint_new() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- handle_call_ok({checkpoint, ?DBNAME, 1}, State),
- ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
- ets:delete(Tid)
- end).
-
-t_handle_call_checkpoint_existing() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- true = ets:insert(Tid, {?DBNAME, 1, true}),
- handle_call_ok({checkpoint, ?DBNAME, 2}, State),
- ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
- ets:delete(Tid)
- end).
-
-t_handle_info_created() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- handle_info_check({'$couch_event', ?DBNAME, created}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_created, [?DBNAME, zig]))
- end).
-
-t_handle_info_deleted() ->
- ?_test(begin
- State = mock_state(),
- handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig]))
- end).
-
-t_handle_info_updated() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- handle_info_check({'$couch_event', ?DBNAME, updated}, State),
- ?assert(meck:validate(?MOD)),
- ?assert(meck:called(?MOD, db_found, [?DBNAME, zig]))
- end).
-
-t_handle_info_other_event() ->
- ?_test(begin
- State = mock_state(),
- handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
- ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
- ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
- ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse]))
- end).
-
-t_handle_info_created_other_db() ->
- ?_test(begin
- State = mock_state(),
- handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
- ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig]))
- end).
-
-t_handle_info_scanner_exit_normal() ->
- ?_test(begin
- Res = handle_info({'EXIT', spid, normal}, mock_state()),
- ?assertMatch({noreply, _}, Res),
- {noreply, RState} = Res,
- ?assertEqual(nil, RState#state.scanner)
- end).
-
-t_handle_info_scanner_crashed() ->
- ?_test(begin
- Res = handle_info({'EXIT', spid, oops}, mock_state()),
- ?assertMatch({stop, {scanner_died, oops}, _State}, Res)
- end).
-
-t_handle_info_event_server_exited() ->
- ?_test(begin
- Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
- ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res)
- end).
-
-t_handle_info_unknown_pid_exited() ->
- ?_test(begin
- State0 = mock_state(),
- Res0 = handle_info({'EXIT', somepid, normal}, State0),
- ?assertMatch({noreply, State0}, Res0),
- State1 = mock_state(),
- Res1 = handle_info({'EXIT', somepid, oops}, State1),
- ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1)
- end).
-
-t_handle_info_change_feed_exited() ->
- ?_test(begin
- Tid0 = mock_ets(),
- State0 = mock_state(Tid0, cpid),
- Res0 = handle_info({'EXIT', cpid, normal}, State0),
- ?assertMatch({noreply, _}, Res0),
- {noreply, RState0} = Res0,
- ?assertEqual([], RState0#state.pids),
- ets:delete(Tid0),
- Tid1 = mock_ets(),
- State1 = mock_state(Tid1, cpid),
- Res1 = handle_info({'EXIT', cpid, oops}, State1),
- ?assertMatch({noreply, _}, Res1),
- {noreply, RState1} = Res1,
- ?assertEqual([], RState1#state.pids),
- ets:delete(Tid1)
- end).
-
-t_handle_info_change_feed_exited_and_need_rescan() ->
- ?_test(begin
- Tid = mock_ets(),
- true = ets:insert(Tid, {?DBNAME, 1, true}),
- State = mock_state(Tid, cpid),
- Res = handle_info({'EXIT', cpid, normal}, State),
- ?assertMatch({noreply, _}, Res),
- {noreply, RState} = Res,
- % rescan flag should have been reset to false
- ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
- % a mock change feed process should be running
- [{?DBNAME, Pid}] = RState#state.pids,
- ?assert(is_pid(Pid)),
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ets:delete(Tid)
- end).
-
-t_spawn_changes_reader() ->
- ?_test(begin
- Pid = start_changes_reader(?DBNAME, 3),
- ?assert(erlang:is_process_alive(Pid)),
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ?assert(meck:validate(couch_db)),
- ?assert(meck:validate(couch_changes)),
- ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
- ?assert(
- meck:called(couch_changes, handle_db_changes, [
- #changes_args{
- include_docs = true,
- since = 3,
- feed = "normal",
- timeout = infinity
- },
- {json_req, null},
- db
- ])
- )
- end).
-
-t_changes_reader_cb_change() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
- Change = change_row(<<"blah">>),
- ChArg = {change, Change, ignore},
- {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
- ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
- unlink(Pid),
- exit(Pid, kill)
- end).
-
-t_changes_reader_cb_stop() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
- ChArg = {stop, 11},
- {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
- % We checkpoint on stop, check if checkpointed at correct sequence
- #state{tid = Tid} = sys:get_state(Pid),
- ?assertEqual([{?DBNAME, 11, false}], ets:tab2list(Tid)),
- unlink(Pid),
- exit(Pid, kill)
- end).
-
-t_changes_reader_cb_other() ->
- ?_assertEqual(acc, changes_reader_cb(other, chtype, acc)).
-
-t_handle_call_resume_scan_no_chfeed_no_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- State = mock_state(Tid),
- RState = resume_scan(?DBNAME, State),
- % Check if inserted checkpoint entry in ets starting at 0
- ?assertEqual([{?DBNAME, 0, false}], ets:tab2list(Tid)),
- % Check if called db_found callback
- ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
- % Check if started a change reader
- [{?DBNAME, Pid}] = RState#state.pids,
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ?assert(
- meck:called(couch_changes, handle_db_changes, [
- #changes_args{
- include_docs = true,
- since = 0,
- feed = "normal",
- timeout = infinity
- },
- {json_req, null},
- db
- ])
- ),
- ets:delete(Tid)
- end).
-
-t_handle_call_resume_scan_chfeed_no_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- Pid = start_changes_reader(?DBNAME, 0),
- State = mock_state(Tid, Pid),
- resume_scan(?DBNAME, State),
- % Check ets checkpoint is set to 0 and rescan = true
- ?assertEqual([{?DBNAME, 0, true}], ets:tab2list(Tid)),
- ets:delete(Tid),
- kill_mock_changes_reader_and_get_its_args(Pid)
- end).
-
-t_handle_call_resume_scan_chfeed_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- true = ets:insert(Tid, [{?DBNAME, 2, false}]),
- Pid = start_changes_reader(?DBNAME, 1),
- State = mock_state(Tid, Pid),
- resume_scan(?DBNAME, State),
- % Check ets checkpoint is set to same endseq but rescan = true
- ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)),
- ets:delete(Tid),
- kill_mock_changes_reader_and_get_its_args(Pid)
- end).
-
-t_handle_call_resume_scan_no_chfeed_ets_entry() ->
- ?_test(begin
- Tid = mock_ets(),
- true = ets:insert(Tid, [{?DBNAME, 1, true}]),
- State = mock_state(Tid),
- RState = resume_scan(?DBNAME, State),
- % Check if reset rescan to false but kept same endseq
- ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)),
- % Check if started a change reader
- [{?DBNAME, Pid}] = RState#state.pids,
- ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
- ?assertEqual({self(), ?DBNAME}, ChArgs),
- ?assert(
- meck:called(couch_changes, handle_db_changes, [
- #changes_args{
- include_docs = true,
- since = 1,
- feed = "normal",
- timeout = infinity
- },
- {json_req, null},
- db
- ])
- ),
- ets:delete(Tid)
- end).
-
-t_start_link() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
- ?assert(is_pid(Pid)),
- ?assertMatch(
- #state{
- mod = ?MOD,
- suffix = ?SUFFIX,
- ctx = nil,
- pids = [],
- skip_ddocs = false
+t_handle_call_change(_) ->
+ State = mock_state(),
+ Change = change_row(<<"blah">>),
+ handle_call_ok({change, ?DBNAME, Change}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])).
+
+t_handle_call_change_filter_design_docs(_) ->
+ State0 = mock_state(),
+ State = State0#state{skip_ddocs = true},
+ Change = change_row(<<"_design/blah">>),
+ handle_call_ok({change, ?DBNAME, Change}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig])).
+
+t_handle_call_checkpoint_new(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid, cpid),
+ handle_call_ok({checkpoint, ?DBNAME, 1}, cpid, State),
+ ?assertEqual([{?DBNAME, 1, false, cpid}], ets:tab2list(Tid)),
+ ets:delete(Tid).
+
+t_handle_call_checkpoint_existing(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid, cpid),
+ handle_call_ok({checkpoint, ?DBNAME, 2}, cpid, State),
+ ?assertEqual([{?DBNAME, 2, false, cpid}], ets:tab2list(Tid)),
+ ets:delete(Tid).
+
+t_handle_call_checkpoint_stale_changes_pid(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid, cpid),
+ handle_call_ok({checkpoint, ?DBNAME, 42}, other, State),
+ ?assertEqual([{?DBNAME, 0, false, cpid}], ets:tab2list(Tid)),
+ ets:delete(Tid).
+
+t_handle_info_created(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ handle_info_check({'$couch_event', ?DBNAME, created}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_created, [?DBNAME, zig])).
+
+t_handle_info_deleted(_) ->
+ State = mock_state(),
+ handle_info_check({'$couch_event', ?DBNAME, deleted}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig])).
+
+t_handle_info_updated(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ handle_info_check({'$couch_event', ?DBNAME, updated}, State),
+ ?assert(meck:validate(?MOD)),
+ ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])).
+
+t_handle_info_other_event(_) ->
+ State = mock_state(),
+ handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State),
+ ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])),
+ ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])),
+ ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse])).
+
+t_handle_info_created_other_db(_) ->
+ State = mock_state(),
+ handle_info_check({'$couch_event', <<"otherdb">>, created}, State),
+ ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig])).
+
+t_handle_info_scanner_exit_normal(_) ->
+ Res = handle_info({'EXIT', spid, normal}, mock_state()),
+ ?assertMatch({noreply, _}, Res),
+ {noreply, RState} = Res,
+ ?assertEqual(nil, RState#state.scanner).
+
+t_handle_info_scanner_crashed(_) ->
+ Res = handle_info({'EXIT', spid, oops}, mock_state()),
+ ?assertMatch({stop, {scanner_died, oops}, _State}, Res).
+
+t_handle_info_event_server_exited(_) ->
+ Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()),
+ ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res).
+
+t_handle_info_unknown_pid_exited(_) ->
+ State0 = mock_state(),
+ Res0 = handle_info({'EXIT', somepid, normal}, State0),
+ ?assertMatch({stop, {unexpected_exit, somepid, normal}, State0}, Res0),
+ State1 = mock_state(),
+ Res1 = handle_info({'EXIT', somepid, oops}, State1),
+ ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1).
+
+t_handle_info_change_feed_exited(_) ->
+ Tid0 = mock_ets(),
+ State0 = mock_state(Tid0, cpid),
+ Res0 = handle_info({'EXIT', cpid, normal}, State0),
+ ?assertMatch({noreply, _}, Res0),
+ {noreply, RState0} = Res0,
+ ?assertEqual(#{}, RState0#state.pids),
+ ets:delete(Tid0),
+ Tid1 = mock_ets(),
+ State1 = mock_state(Tid1, cpid),
+ Res1 = handle_info({'EXIT', cpid, oops}, State1),
+ ?assertMatch({noreply, _}, Res1),
+ {noreply, RState1} = Res1,
+ ?assertEqual(#{}, RState1#state.pids),
+ ets:delete(Tid1).
+
+t_handle_info_change_feed_exited_and_need_rescan(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid, cpid),
+ true = ets:insert(Tid, {?DBNAME, 1, true, cpid}),
+ Res = handle_info({'EXIT', cpid, normal}, State),
+ ?assertMatch({noreply, _}, Res),
+ {noreply, RState} = Res,
+ % a mock change feed process should be running
+ [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids),
+ ?assert(is_pid(Pid)),
+ % rescan flag should have been reset to false
+ ?assertEqual([{?DBNAME, 1, false, Pid}], ets:tab2list(Tid)),
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ets:delete(Tid).
+
+t_spawn_changes_reader(_) ->
+ Pid = start_changes_reader(?DBNAME, 3),
+ ?assert(erlang:is_process_alive(Pid)),
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ?assert(meck:validate(couch_db)),
+ ?assert(meck:validate(couch_changes)),
+ ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])),
+ ?assert(
+ meck:called(couch_changes, handle_db_changes, [
+ #changes_args{
+ include_docs = true,
+ since = 3,
+ feed = "normal",
+ timeout = infinity
},
- sys:get_state(Pid)
- ),
- unlink(Pid),
- exit(Pid, kill),
- ?assert(meck:called(couch_event, register_all, [Pid]))
- end).
-
-t_start_link_no_ddocs() ->
- ?_test(begin
- {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
- ?assert(is_pid(Pid)),
- ?assertMatch(
- #state{
- mod = ?MOD,
- suffix = ?SUFFIX,
- ctx = nil,
- pids = [],
- skip_ddocs = true
+ {json_req, null},
+ db
+ ])
+ ).
+
+t_changes_reader_cb_change(_) ->
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []),
+ Change = change_row(<<"blah">>),
+ ChArg = {change, Change, ignore},
+ {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}),
+ ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])),
+ unlink(Pid),
+ exit(Pid, kill).
+
+t_changes_reader_cb_stop(_) ->
+ {ok, ServerPid} = start_link(?SUFFIX, ?MOD, zig, []),
+ #state{tid = Tid} = sys:get_state(ServerPid),
+ ChPid = self(),
+ ets:insert(Tid, {?DBNAME, 1, false, ChPid}),
+ sys:replace_state(ServerPid, fun(#state{} = OldSt) ->
+ OldSt#state{pids = #{ChPid => ?DBNAME}}
+ end),
+ ChArg = {stop, 11},
+ {ServerPid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {ServerPid,
?DBNAME}),
+ % We checkpoint on stop, check if checkpointed at correct sequence
+ #state{tid = Tid, pids = Pids} = sys:get_state(ServerPid),
+ ?assertMatch(#{ChPid := ?DBNAME}, Pids),
+ ?assertEqual([{?DBNAME, 11, false, ChPid}], ets:tab2list(Tid)),
+ unlink(ServerPid),
+ exit(ServerPid, kill).
+
+t_changes_reader_cb_other(_) ->
+ ?assertEqual(acc, changes_reader_cb(other, chtype, acc)).
+
+t_handle_call_resume_scan_no_chfeed_no_ets_entry(_) ->
+ Tid = mock_ets(),
+ State = mock_state(Tid),
+ RState = resume_scan(?DBNAME, State),
+ % Check if called db_found callback
+ ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])),
+ % Check if started a change reader
+ [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids),
+ % Check if inserted checkpoint entry in ets starting at 0
+ ?assertEqual([{?DBNAME, 0, false, Pid}], ets:tab2list(Tid)),
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ?assert(
+ meck:called(couch_changes, handle_db_changes, [
+ #changes_args{
+ include_docs = true,
+ since = 0,
+ feed = "normal",
+ timeout = infinity
+ },
+ {json_req, null},
+ db
+ ])
+ ),
+ ets:delete(Tid).
+
+t_handle_call_resume_scan_chfeed_no_ets_entry(_) ->
+ Tid = mock_ets(),
+ Pid = start_changes_reader(?DBNAME, 0),
+ State = mock_state(Tid, Pid),
+ resume_scan(?DBNAME, State),
+ % Check ets checkpoint is set to 0 and rescan = true
+ ?assertEqual([{?DBNAME, 0, true, Pid}], ets:tab2list(Tid)),
+ ets:delete(Tid),
+ kill_mock_changes_reader_and_get_its_args(Pid).
+
+t_handle_call_resume_scan_chfeed_ets_entry(_) ->
+ Tid = mock_ets(),
+ Pid = start_changes_reader(?DBNAME, 1),
+ State = mock_state(Tid, Pid),
+ true = ets:insert(Tid, [{?DBNAME, 2, false, Pid}]),
+ resume_scan(?DBNAME, State),
+ % Check ets checkpoint is set to same endseq but rescan = true
+ ?assertEqual([{?DBNAME, 2, true, Pid}], ets:tab2list(Tid)),
+ ets:delete(Tid),
+ kill_mock_changes_reader_and_get_its_args(Pid).
+
+t_handle_call_resume_scan_no_chfeed_ets_entry(_) ->
+ Tid = mock_ets(),
+ true = ets:insert(Tid, [{?DBNAME, 1, true, undefined}]),
+ State = mock_state(Tid),
+ RState = resume_scan(?DBNAME, State),
+ % Check if started a change reader
+ [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids),
+ % Check if reset rescan to false but kept same endseq
+ ?assertEqual([{?DBNAME, 1, false, Pid}], ets:tab2list(Tid)),
+ ChArgs = kill_mock_changes_reader_and_get_its_args(Pid),
+ ?assertEqual({self(), ?DBNAME}, ChArgs),
+ ?assert(
+ meck:called(couch_changes, handle_db_changes, [
+ #changes_args{
+ include_docs = true,
+ since = 1,
+ feed = "normal",
+ timeout = infinity
},
- sys:get_state(Pid)
- ),
- unlink(Pid),
- exit(Pid, kill)
- end).
+ {json_req, null},
+ db
+ ])
+ ),
+ ets:delete(Tid).
+
+t_start_link(_) ->
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []),
+ ?assert(is_pid(Pid)),
+ ?assertMatch(
+ #state{
+ mod = ?MOD,
+ suffix = ?SUFFIX,
+ ctx = nil,
+ pids = #{},
+ skip_ddocs = false
+ },
+ sys:get_state(Pid)
+ ),
+ unlink(Pid),
+ exit(Pid, kill),
+ ?assert(meck:called(couch_event, register_all, [Pid])).
+
+t_start_link_no_ddocs(_) ->
+ {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]),
+ ?assert(is_pid(Pid)),
+ ?assertMatch(
+ #state{
+ mod = ?MOD,
+ suffix = ?SUFFIX,
+ ctx = nil,
+ pids = #{},
+ skip_ddocs = true
+ },
+ sys:get_state(Pid)
+ ),
+ unlink(Pid),
+ exit(Pid, kill).
-t_misc_gen_server_callbacks() ->
- ?_test(begin
- ?assertEqual(ok, terminate(reason, state))
- end).
+t_misc_gen_server_callbacks(_) ->
+ ?assertEqual(ok, terminate(reason, state)).
scan_dbs_test_() ->
{
@@ -734,45 +695,60 @@ scan_dbs_test_() ->
fabric:delete_db(GlobalDb, [?CTX]),
test_util:stop_couch(Ctx)
end,
- {with, [
- fun t_find_shard/1,
- fun t_shard_not_found/1,
- fun t_pass_local/1,
- fun t_fail_local/1
- ]}
+ with([
+ ?TDEF(t_find_shard),
+ ?TDEF(t_shard_not_found),
+ ?TDEF(t_pass_local),
+ ?TDEF(t_fail_local),
+ ?TDEF(t_scan_all_dbs)
+ ])
}.
t_find_shard({_, DbName, _}) ->
- ?_test(begin
- ?assertEqual(2, length(local_shards(DbName)))
- end).
+ ?assertEqual(2, length(local_shards(DbName))).
t_shard_not_found(_) ->
- ?_test(begin
- ?assertEqual([], local_shards(?tempdb()))
- end).
+ ?assertEqual([], local_shards(?tempdb())).
t_pass_local({_, _, LocalDb}) ->
- ?_test(begin
- scan_local_db(self(), LocalDb),
- receive
- {'$gen_cast', Msg} ->
- ?assertEqual(Msg, {resume_scan, LocalDb})
- after 0 ->
- ?assert(false)
- end
- end).
+ scan_local_db(self(), LocalDb),
+ receive
+ {'$gen_cast', Msg} ->
+ ?assertEqual(Msg, {resume_scan, LocalDb})
+ after 0 ->
+ ?assert(false)
+ end.
t_fail_local({_, _, LocalDb}) ->
- ?_test(begin
- scan_local_db(self(), <<"some_other_db">>),
- receive
- {'$gen_cast', Msg} ->
- ?assertNotEqual(Msg, {resume_scan, LocalDb})
- after 0 ->
- ?assert(true)
- end
- end).
+ scan_local_db(self(), <<"some_other_db">>),
+ receive
+ {'$gen_cast', Msg} ->
+ ?assertNotEqual(Msg, {resume_scan, LocalDb})
+ after 0 ->
+ ?assert(true)
+ end.
+
+t_scan_all_dbs({_, GlobalDb, _}) ->
+ scan_all_dbs(self(), GlobalDb),
+ ?assertMatch(
+ [
+ {'$gen_cast', {resume_scan, <<"shards/00000000-7fffffff/",
_/binary>>}},
+ {'$gen_cast', {resume_scan, <<"shards/80000000-ffffffff/",
_/binary>>}}
+ ],
+ lists:sort(flush([]))
+ ).
+
+flush(Acc) ->
+ receive
+ Msg ->
+ NewMsg = [Msg | Acc],
+ case length(NewMsg) >= 2 of
+ true -> NewMsg;
+ false -> flush(NewMsg)
+ end
+ after 1000 ->
+ Acc
+ end.
% Test helper functions
@@ -826,7 +802,7 @@ mock_state() ->
suffix = ?SUFFIX,
event_server = esref,
scanner = spid,
- pids = []
+ pids = #{}
}.
mock_state(Ets) ->
@@ -835,7 +811,8 @@ mock_state(Ets) ->
mock_state(Ets, Pid) ->
State = mock_state(Ets),
- State#state{pids = [{?DBNAME, Pid}]}.
+ ets:insert(State#state.tid, {?DBNAME, 0, false, Pid}),
+ State#state{pids = #{Pid => ?DBNAME}}.
change_row(Id) when is_binary(Id) ->
{[
@@ -846,7 +823,11 @@ change_row(Id) when is_binary(Id) ->
]}.
handle_call_ok(Msg, State) ->
- ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)).
+ handle_call_ok(Msg, from, State).
+
+handle_call_ok(Msg, FromPid, State) ->
+ FromTag = make_ref(),
+ ?assertMatch({reply, ok, _}, handle_call(Msg, {FromPid, FromTag}, State)).
handle_info_check(Msg, State) ->
?assertMatch({noreply, _}, handle_info(Msg, State)).
diff --git
a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
index 192d113c6..76450a692 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_docs_tests.erl
@@ -101,8 +101,8 @@ t_replicator_doc_state_fields_test_() ->
fun setup_prefixed_replicator_db/0,
fun teardown/1,
with([
- ?TDEF(t_doc_fields_are_updated, 10),
- ?TDEF(t_doc_fields_are_ignored, 10)
+ ?TDEF(t_doc_fields_are_updated, 15),
+ ?TDEF(t_doc_fields_are_ignored, 15)
])
}.
@@ -112,8 +112,8 @@ t_replicator_doc_state_fields_update_docs_true_test_() ->
fun setup_prefixed_replicator_db_with_update_docs_true/0,
fun teardown/1,
with([
- ?TDEF(t_doc_fields_are_updated, 10),
- ?TDEF(t_doc_fields_are_ignored, 10)
+ ?TDEF(t_doc_fields_are_updated, 15),
+ ?TDEF(t_doc_fields_are_ignored, 15)
])
}.
@@ -135,7 +135,7 @@ t_scheduler_docs_total_rows({_Ctx, {RepDb, Source,
Target}}) ->
{_, #{}} -> wait
end
end,
- 10000,
+ 14000,
1000
),
Docs = maps:get(<<"docs">>, Body),
@@ -183,7 +183,7 @@ t_doc_fields_are_updated({_Ctx, {RepDb, Source, Target}}) ->
{_, #{}} -> wait
end
end,
- 10000,
+ 14000,
1000
),
?assertMatch(
@@ -225,7 +225,7 @@ t_doc_fields_are_ignored({_Ctx, {RepDb, Source, Target}}) ->
{_, #{}} -> wait
end
end,
- 10000,
+ 14000,
1000
),
?assertMatch(