This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch clean-up-multidb-changes in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 8eef76b8a3fabc60a5833cef5531ee85510cf492 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Dec 12 12:11:45 2023 -0500 Optimize and clean up couch_multidb_changes couch_multidb_changes is in charge of monitoring changes to multiple databases. It is what drives the couch_replicator application when users update replication docs. It starts off by scanning all the local db shards and launches changes feeds for each of them. After it processes each changes feed, it checkpoints where it stops. As the shards are updated, it starts change feeds for those updated shards and checkpoints again. The checkpoints are kept in an ets table and the main logic which decides when to start a changes feed for a particular shard is in the resume_scan/2 function. This change makes a few small optimizations: * Use a map to track Pid -> DbName mappings. This avoids using a O(N) operation for looking up exiting change feed processes. So `pids` is switched to be a map of `#{Pid => DbName}` and the ets table has a 4th tuple member to keep track of `dbname -> pid` mappings. * Previously, when the plain "suffix" dbname (just <<"_replicator">>) was found, we tried to open and close it to see if it exists. Change to use `couch_server:exists/1` instead. The are also a few clean-ups: * Update tests to use ?TDEF_FE/?TDEF macros. This shortens them a bit and saves one indentation level. * Increase test coverage from 86% to 96%. To help create a few more test scenarios switched to using a public ets table instead of a private one. --- src/couch/src/couch_multidb_changes.erl | 832 ++++++++++++++++---------------- 1 file changed, 405 insertions(+), 427 deletions(-) diff --git a/src/couch/src/couch_multidb_changes.erl b/src/couch/src/couch_multidb_changes.erl index a9b4c4fb6..739afddef 100644 --- a/src/couch/src/couch_multidb_changes.erl +++ b/src/couch/src/couch_multidb_changes.erl @@ -46,7 +46,7 @@ suffix :: binary(), event_server :: reference(), scanner :: nil | pid(), - pids :: [{binary(), pid()}], + pids :: #{}, skip_ddocs :: boolean() }). @@ -85,36 +85,34 @@ init([DbSuffix, Module, Context, Opts]) -> process_flag(trap_exit, true), Server = self(), {ok, #state{ - tid = ets:new(?MODULE, [set, protected]), + tid = ets:new(?MODULE, [set, public]), mod = Module, ctx = Context, suffix = DbSuffix, event_server = register_with_event_server(Server), scanner = spawn_link(fun() -> scan_all_dbs(Server, DbSuffix) end), - pids = [], + pids = #{}, skip_ddocs = proplists:is_defined(skip_ddocs, Opts) }}. terminate(_Reason, _State) -> ok. -handle_call( - {change, DbName, Change}, - _From, - #state{skip_ddocs = SkipDDocs, mod = Mod, ctx = Ctx} = State -) -> +handle_call({change, DbName, Change}, _From, #state{} = State) -> + #state{skip_ddocs = SkipDDocs, mod = Mod, ctx = Ctx} = State, case {SkipDDocs, is_design_doc(Change)} of {true, true} -> {reply, ok, State}; {_, _} -> {reply, ok, State#state{ctx = Mod:db_change(DbName, Change, Ctx)}} end; -handle_call({checkpoint, DbName, EndSeq}, _From, #state{tid = Ets} = State) -> +handle_call({checkpoint, DbName, EndSeq}, {Pid, _Tag} = _From, #state{tid = Ets} = State) -> case ets:lookup(Ets, DbName) of - [] -> - true = ets:insert(Ets, {DbName, EndSeq, false}); - [{DbName, _OldSeq, Rescan}] -> - true = ets:insert(Ets, {DbName, EndSeq, Rescan}) + [{DbName, _OldSeq, Rescan, Pid}] -> + true = ets:insert(Ets, {DbName, EndSeq, Rescan, Pid}); + _ -> + % Ignore stale checkpoints or checkpoints from unknown change feeds + ok end, {reply, ok, State}. @@ -134,10 +132,10 @@ handle_info({'EXIT', From, normal}, #state{scanner = From} = State) -> {noreply, State#state{scanner = nil}}; handle_info({'EXIT', From, Reason}, #state{scanner = From} = State) -> {stop, {scanner_died, Reason}, State}; -handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) -> +handle_info({'EXIT', From, Reason}, #state{pids = #{} = Pids} = State) -> couch_log:debug("~p change feed exited ~p", [State#state.suffix, From]), - case lists:keytake(From, 2, Pids) of - {value, {DbName, From}, NewPids} -> + case maps:take(From, Pids) of + {DbName, NewPids} -> if Reason == normal -> ok; @@ -147,14 +145,15 @@ handle_info({'EXIT', From, Reason}, #state{pids = Pids} = State) -> end, NewState = State#state{pids = NewPids}, case ets:lookup(State#state.tid, DbName) of - [{DbName, _EndSeq, true}] -> + [{DbName, _EndSeq, true, From}] -> + % Match the From pid explicitly and then clear it + % The pid is at 4th position in the ets object + ets:update_element(State#state.tid, DbName, {4, undefined}), {noreply, resume_scan(DbName, NewState)}; _ -> {noreply, NewState} end; - false when Reason == normal -> - {noreply, State}; - false -> + error -> Fmt = "~s(~p) : Unknown pid ~w died :: ~w", couch_log:error(Fmt, [?MODULE, State#state.suffix, From, Reason]), {stop, {unexpected_exit, From, Reason}, State} @@ -182,34 +181,28 @@ db_callback(_Other, _DbName, State) -> State. -spec resume_scan(binary(), #state{}) -> #state{}. -resume_scan(DbName, #state{pids = Pids, tid = Ets} = State) -> - case {lists:keyfind(DbName, 1, Pids), ets:lookup(Ets, DbName)} of - {{DbName, _}, []} -> - % Found existing change feed, but not entry in ETS - % Flag a need to rescan from begining - true = ets:insert(Ets, {DbName, 0, true}), - State; - {{DbName, _}, [{DbName, EndSeq, _}]} -> +resume_scan(DbName, #state{pids = #{} = Pids, tid = Ets} = State) -> + case ets:lookup(Ets, DbName) of + [{DbName, EndSeq, _, undefined}] -> + % No existing change feed running. Found existing checkpoint. + % Start a new change reader from last checkpoint. + Pid = start_changes_reader(DbName, EndSeq), + true = ets:insert(Ets, {DbName, EndSeq, false, Pid}), + State#state{pids = Pids#{Pid => DbName}}; + [{DbName, EndSeq, _, Pid}] -> % Found existing change feed and entry in ETS % Flag a need to rescan from last ETS checkpoint - true = ets:insert(Ets, {DbName, EndSeq, true}), + true = ets:insert(Ets, {DbName, EndSeq, true, Pid}), State; - {false, []} -> - % No existing change feed running. No entry in ETS. - % This is first time seeing this db shard. - % Notify user with a found callback. Insert checkpoint - % entry in ETS to start from 0. And start a change feed. - true = ets:insert(Ets, {DbName, 0, false}), + [] -> + % No entry in ETS. This is first time seeing this db shard. Notify + % user with a found callback. Insert checkpoint entry in ETS to + % start from 0. And start a change feed. + Pid = start_changes_reader(DbName, 0), + true = ets:insert(Ets, {DbName, 0, false, Pid}), Mod = State#state.mod, Ctx = Mod:db_found(DbName, State#state.ctx), - Pid = start_changes_reader(DbName, 0), - State#state{ctx = Ctx, pids = [{DbName, Pid} | Pids]}; - {false, [{DbName, EndSeq, _}]} -> - % No existing change feed running. Found existing checkpoint. - % Start a new change reader from last checkpoint. - true = ets:insert(Ets, {DbName, EndSeq, false}), - Pid = start_changes_reader(DbName, EndSeq), - State#state{pids = [{DbName, Pid} | Pids]} + State#state{ctx = Ctx, pids = Pids#{Pid => DbName}} end. start_changes_reader(DbName, Since) -> @@ -237,9 +230,7 @@ changes_reader_cb(_, _, Acc) -> scan_all_dbs(Server, DbSuffix) when is_pid(Server) -> ok = scan_local_db(Server, DbSuffix), - {ok, Db} = mem3_util:ensure_exists( - config:get("mem3", "shards_db", "_dbs") - ), + {ok, Db} = mem3_util:ensure_exists(shards_db()), ChangesFun = couch_changes:handle_db_changes(#changes_args{}, nil, Db), ChangesFun({fun scan_changes_cb/3, {Server, DbSuffix, 1}}), couch_db:close(Db). @@ -251,7 +242,7 @@ scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) -> Acc; _Else -> NameMatch = DbSuffix =:= couch_db:dbname_suffix(DbName), - case {NameMatch, couch_replicator_utils:is_deleted(Change)} of + case {NameMatch, is_deleted(Change)} of {false, _} -> Acc; {true, true} -> @@ -264,6 +255,12 @@ scan_changes_cb({change, {Change}, _}, _, {_Server, DbSuffix, _Count} = Acc) -> scan_changes_cb(_, _, Acc) -> Acc. +is_deleted(Change) -> + couch_util:get_value(<<"deleted">>, Change, false). + +shards_db() -> + config:get("mem3", "shards_db", "_dbs"). + local_shards(DbName) -> try [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)] @@ -288,12 +285,9 @@ jitter(N) -> couch_rand:uniform(Range). scan_local_db(Server, DbSuffix) when is_pid(Server) -> - case couch_db:open_int(DbSuffix, [?CTX, sys_db, nologifmissing]) of - {ok, Db} -> - gen_server:cast(Server, {resume_scan, DbSuffix}), - ok = couch_db:close(Db); - _Error -> - ok + case couch_server:exists(DbSuffix) of + true -> gen_server:cast(Server, {resume_scan, DbSuffix}); + false -> ok end. is_design_doc({Change}) -> @@ -311,7 +305,6 @@ is_design_doc_id(_) -> -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -define(MOD, multidb_test_module). @@ -328,32 +321,33 @@ couch_multidb_changes_test_() -> fun setup/0, fun teardown/1, [ - t_handle_call_change(), - t_handle_call_change_filter_design_docs(), - t_handle_call_checkpoint_new(), - t_handle_call_checkpoint_existing(), - t_handle_info_created(), - t_handle_info_deleted(), - t_handle_info_updated(), - t_handle_info_other_event(), - t_handle_info_created_other_db(), - t_handle_info_scanner_exit_normal(), - t_handle_info_scanner_crashed(), - t_handle_info_event_server_exited(), - t_handle_info_unknown_pid_exited(), - t_handle_info_change_feed_exited(), - t_handle_info_change_feed_exited_and_need_rescan(), - t_spawn_changes_reader(), - t_changes_reader_cb_change(), - t_changes_reader_cb_stop(), - t_changes_reader_cb_other(), - t_handle_call_resume_scan_no_chfeed_no_ets_entry(), - t_handle_call_resume_scan_chfeed_no_ets_entry(), - t_handle_call_resume_scan_chfeed_ets_entry(), - t_handle_call_resume_scan_no_chfeed_ets_entry(), - t_start_link(), - t_start_link_no_ddocs(), - t_misc_gen_server_callbacks() + ?TDEF_FE(t_handle_call_change), + ?TDEF_FE(t_handle_call_change_filter_design_docs), + ?TDEF_FE(t_handle_call_checkpoint_new), + ?TDEF_FE(t_handle_call_checkpoint_existing), + ?TDEF_FE(t_handle_call_checkpoint_stale_changes_pid), + ?TDEF_FE(t_handle_info_created), + ?TDEF_FE(t_handle_info_deleted), + ?TDEF_FE(t_handle_info_updated), + ?TDEF_FE(t_handle_info_other_event), + ?TDEF_FE(t_handle_info_created_other_db), + ?TDEF_FE(t_handle_info_scanner_exit_normal), + ?TDEF_FE(t_handle_info_scanner_crashed), + ?TDEF_FE(t_handle_info_event_server_exited), + ?TDEF_FE(t_handle_info_unknown_pid_exited), + ?TDEF_FE(t_handle_info_change_feed_exited), + ?TDEF_FE(t_handle_info_change_feed_exited_and_need_rescan), + ?TDEF_FE(t_spawn_changes_reader), + ?TDEF_FE(t_changes_reader_cb_change), + ?TDEF_FE(t_changes_reader_cb_stop), + ?TDEF_FE(t_changes_reader_cb_other), + ?TDEF_FE(t_handle_call_resume_scan_no_chfeed_no_ets_entry), + ?TDEF_FE(t_handle_call_resume_scan_chfeed_no_ets_entry), + ?TDEF_FE(t_handle_call_resume_scan_chfeed_ets_entry), + ?TDEF_FE(t_handle_call_resume_scan_no_chfeed_ets_entry), + ?TDEF_FE(t_start_link), + ?TDEF_FE(t_start_link_no_ddocs), + ?TDEF_FE(t_misc_gen_server_callbacks) ] } }. @@ -362,7 +356,7 @@ setup_all() -> mock_logs(), mock_callback_mod(), meck:expect(couch_event, register_all, 1, ok), - meck:expect(config, get, ["mem3", "shards_db", '_'], "_dbs"), + test_util:start_applications([config]), meck:expect(mem3_util, ensure_exists, 1, {ok, dbs}), ChangesFun = meck:val(fun(_) -> ok end), meck:expect(couch_changes, handle_db_changes, 3, ChangesFun), @@ -387,6 +381,7 @@ setup_all() -> EvtPid. teardown_all(EvtPid) -> + test_util:stop_applications([config]), unlink(EvtPid), exit(EvtPid, kill), meck:unload(). @@ -403,322 +398,285 @@ setup() -> teardown(_) -> ok. -t_handle_call_change() -> - ?_test(begin - State = mock_state(), - Change = change_row(<<"blah">>), - handle_call_ok({change, ?DBNAME, Change}, State), - ?assert(meck:validate(?MOD)), - ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])) - end). - -t_handle_call_change_filter_design_docs() -> - ?_test(begin - State0 = mock_state(), - State = State0#state{skip_ddocs = true}, - Change = change_row(<<"_design/blah">>), - handle_call_ok({change, ?DBNAME, Change}, State), - ?assert(meck:validate(?MOD)), - ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig])) - end). - -t_handle_call_checkpoint_new() -> - ?_test(begin - Tid = mock_ets(), - State = mock_state(Tid), - handle_call_ok({checkpoint, ?DBNAME, 1}, State), - ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)), - ets:delete(Tid) - end). - -t_handle_call_checkpoint_existing() -> - ?_test(begin - Tid = mock_ets(), - State = mock_state(Tid), - true = ets:insert(Tid, {?DBNAME, 1, true}), - handle_call_ok({checkpoint, ?DBNAME, 2}, State), - ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)), - ets:delete(Tid) - end). - -t_handle_info_created() -> - ?_test(begin - Tid = mock_ets(), - State = mock_state(Tid), - handle_info_check({'$couch_event', ?DBNAME, created}, State), - ?assert(meck:validate(?MOD)), - ?assert(meck:called(?MOD, db_created, [?DBNAME, zig])) - end). - -t_handle_info_deleted() -> - ?_test(begin - State = mock_state(), - handle_info_check({'$couch_event', ?DBNAME, deleted}, State), - ?assert(meck:validate(?MOD)), - ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig])) - end). - -t_handle_info_updated() -> - ?_test(begin - Tid = mock_ets(), - State = mock_state(Tid), - handle_info_check({'$couch_event', ?DBNAME, updated}, State), - ?assert(meck:validate(?MOD)), - ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])) - end). - -t_handle_info_other_event() -> - ?_test(begin - State = mock_state(), - handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State), - ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])), - ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])), - ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse])) - end). - -t_handle_info_created_other_db() -> - ?_test(begin - State = mock_state(), - handle_info_check({'$couch_event', <<"otherdb">>, created}, State), - ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig])) - end). - -t_handle_info_scanner_exit_normal() -> - ?_test(begin - Res = handle_info({'EXIT', spid, normal}, mock_state()), - ?assertMatch({noreply, _}, Res), - {noreply, RState} = Res, - ?assertEqual(nil, RState#state.scanner) - end). - -t_handle_info_scanner_crashed() -> - ?_test(begin - Res = handle_info({'EXIT', spid, oops}, mock_state()), - ?assertMatch({stop, {scanner_died, oops}, _State}, Res) - end). - -t_handle_info_event_server_exited() -> - ?_test(begin - Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()), - ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res) - end). - -t_handle_info_unknown_pid_exited() -> - ?_test(begin - State0 = mock_state(), - Res0 = handle_info({'EXIT', somepid, normal}, State0), - ?assertMatch({noreply, State0}, Res0), - State1 = mock_state(), - Res1 = handle_info({'EXIT', somepid, oops}, State1), - ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1) - end). - -t_handle_info_change_feed_exited() -> - ?_test(begin - Tid0 = mock_ets(), - State0 = mock_state(Tid0, cpid), - Res0 = handle_info({'EXIT', cpid, normal}, State0), - ?assertMatch({noreply, _}, Res0), - {noreply, RState0} = Res0, - ?assertEqual([], RState0#state.pids), - ets:delete(Tid0), - Tid1 = mock_ets(), - State1 = mock_state(Tid1, cpid), - Res1 = handle_info({'EXIT', cpid, oops}, State1), - ?assertMatch({noreply, _}, Res1), - {noreply, RState1} = Res1, - ?assertEqual([], RState1#state.pids), - ets:delete(Tid1) - end). - -t_handle_info_change_feed_exited_and_need_rescan() -> - ?_test(begin - Tid = mock_ets(), - true = ets:insert(Tid, {?DBNAME, 1, true}), - State = mock_state(Tid, cpid), - Res = handle_info({'EXIT', cpid, normal}, State), - ?assertMatch({noreply, _}, Res), - {noreply, RState} = Res, - % rescan flag should have been reset to false - ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)), - % a mock change feed process should be running - [{?DBNAME, Pid}] = RState#state.pids, - ?assert(is_pid(Pid)), - ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), - ?assertEqual({self(), ?DBNAME}, ChArgs), - ets:delete(Tid) - end). - -t_spawn_changes_reader() -> - ?_test(begin - Pid = start_changes_reader(?DBNAME, 3), - ?assert(erlang:is_process_alive(Pid)), - ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), - ?assertEqual({self(), ?DBNAME}, ChArgs), - ?assert(meck:validate(couch_db)), - ?assert(meck:validate(couch_changes)), - ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])), - ?assert( - meck:called(couch_changes, handle_db_changes, [ - #changes_args{ - include_docs = true, - since = 3, - feed = "normal", - timeout = infinity - }, - {json_req, null}, - db - ]) - ) - end). - -t_changes_reader_cb_change() -> - ?_test(begin - {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []), - Change = change_row(<<"blah">>), - ChArg = {change, Change, ignore}, - {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}), - ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])), - unlink(Pid), - exit(Pid, kill) - end). - -t_changes_reader_cb_stop() -> - ?_test(begin - {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []), - ChArg = {stop, 11}, - {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}), - % We checkpoint on stop, check if checkpointed at correct sequence - #state{tid = Tid} = sys:get_state(Pid), - ?assertEqual([{?DBNAME, 11, false}], ets:tab2list(Tid)), - unlink(Pid), - exit(Pid, kill) - end). - -t_changes_reader_cb_other() -> - ?_assertEqual(acc, changes_reader_cb(other, chtype, acc)). - -t_handle_call_resume_scan_no_chfeed_no_ets_entry() -> - ?_test(begin - Tid = mock_ets(), - State = mock_state(Tid), - RState = resume_scan(?DBNAME, State), - % Check if inserted checkpoint entry in ets starting at 0 - ?assertEqual([{?DBNAME, 0, false}], ets:tab2list(Tid)), - % Check if called db_found callback - ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])), - % Check if started a change reader - [{?DBNAME, Pid}] = RState#state.pids, - ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), - ?assertEqual({self(), ?DBNAME}, ChArgs), - ?assert( - meck:called(couch_changes, handle_db_changes, [ - #changes_args{ - include_docs = true, - since = 0, - feed = "normal", - timeout = infinity - }, - {json_req, null}, - db - ]) - ), - ets:delete(Tid) - end). - -t_handle_call_resume_scan_chfeed_no_ets_entry() -> - ?_test(begin - Tid = mock_ets(), - Pid = start_changes_reader(?DBNAME, 0), - State = mock_state(Tid, Pid), - resume_scan(?DBNAME, State), - % Check ets checkpoint is set to 0 and rescan = true - ?assertEqual([{?DBNAME, 0, true}], ets:tab2list(Tid)), - ets:delete(Tid), - kill_mock_changes_reader_and_get_its_args(Pid) - end). - -t_handle_call_resume_scan_chfeed_ets_entry() -> - ?_test(begin - Tid = mock_ets(), - true = ets:insert(Tid, [{?DBNAME, 2, false}]), - Pid = start_changes_reader(?DBNAME, 1), - State = mock_state(Tid, Pid), - resume_scan(?DBNAME, State), - % Check ets checkpoint is set to same endseq but rescan = true - ?assertEqual([{?DBNAME, 2, true}], ets:tab2list(Tid)), - ets:delete(Tid), - kill_mock_changes_reader_and_get_its_args(Pid) - end). - -t_handle_call_resume_scan_no_chfeed_ets_entry() -> - ?_test(begin - Tid = mock_ets(), - true = ets:insert(Tid, [{?DBNAME, 1, true}]), - State = mock_state(Tid), - RState = resume_scan(?DBNAME, State), - % Check if reset rescan to false but kept same endseq - ?assertEqual([{?DBNAME, 1, false}], ets:tab2list(Tid)), - % Check if started a change reader - [{?DBNAME, Pid}] = RState#state.pids, - ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), - ?assertEqual({self(), ?DBNAME}, ChArgs), - ?assert( - meck:called(couch_changes, handle_db_changes, [ - #changes_args{ - include_docs = true, - since = 1, - feed = "normal", - timeout = infinity - }, - {json_req, null}, - db - ]) - ), - ets:delete(Tid) - end). - -t_start_link() -> - ?_test(begin - {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []), - ?assert(is_pid(Pid)), - ?assertMatch( - #state{ - mod = ?MOD, - suffix = ?SUFFIX, - ctx = nil, - pids = [], - skip_ddocs = false +t_handle_call_change(_) -> + State = mock_state(), + Change = change_row(<<"blah">>), + handle_call_ok({change, ?DBNAME, Change}, State), + ?assert(meck:validate(?MOD)), + ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])). + +t_handle_call_change_filter_design_docs(_) -> + State0 = mock_state(), + State = State0#state{skip_ddocs = true}, + Change = change_row(<<"_design/blah">>), + handle_call_ok({change, ?DBNAME, Change}, State), + ?assert(meck:validate(?MOD)), + ?assertNot(meck:called(?MOD, db_change, [?DBNAME, Change, zig])). + +t_handle_call_checkpoint_new(_) -> + Tid = mock_ets(), + State = mock_state(Tid, cpid), + handle_call_ok({checkpoint, ?DBNAME, 1}, cpid, State), + ?assertEqual([{?DBNAME, 1, false, cpid}], ets:tab2list(Tid)), + ets:delete(Tid). + +t_handle_call_checkpoint_existing(_) -> + Tid = mock_ets(), + State = mock_state(Tid, cpid), + handle_call_ok({checkpoint, ?DBNAME, 2}, cpid, State), + ?assertEqual([{?DBNAME, 2, false, cpid}], ets:tab2list(Tid)), + ets:delete(Tid). + +t_handle_call_checkpoint_stale_changes_pid(_) -> + Tid = mock_ets(), + State = mock_state(Tid, cpid), + handle_call_ok({checkpoint, ?DBNAME, 42}, other, State), + ?assertEqual([{?DBNAME, 0, false, cpid}], ets:tab2list(Tid)), + ets:delete(Tid). + +t_handle_info_created(_) -> + Tid = mock_ets(), + State = mock_state(Tid), + handle_info_check({'$couch_event', ?DBNAME, created}, State), + ?assert(meck:validate(?MOD)), + ?assert(meck:called(?MOD, db_created, [?DBNAME, zig])). + +t_handle_info_deleted(_) -> + State = mock_state(), + handle_info_check({'$couch_event', ?DBNAME, deleted}, State), + ?assert(meck:validate(?MOD)), + ?assert(meck:called(?MOD, db_deleted, [?DBNAME, zig])). + +t_handle_info_updated(_) -> + Tid = mock_ets(), + State = mock_state(Tid), + handle_info_check({'$couch_event', ?DBNAME, updated}, State), + ?assert(meck:validate(?MOD)), + ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])). + +t_handle_info_other_event(_) -> + State = mock_state(), + handle_info_check({'$couch_event', ?DBNAME, somethingelse}, State), + ?assertNot(meck:called(?MOD, db_created, [?DBNAME, somethingelse])), + ?assertNot(meck:called(?MOD, db_deleted, [?DBNAME, somethingelse])), + ?assertNot(meck:called(?MOD, db_found, [?DBNAME, somethingelse])). + +t_handle_info_created_other_db(_) -> + State = mock_state(), + handle_info_check({'$couch_event', <<"otherdb">>, created}, State), + ?assertNot(meck:called(?MOD, db_created, [?DBNAME, zig])). + +t_handle_info_scanner_exit_normal(_) -> + Res = handle_info({'EXIT', spid, normal}, mock_state()), + ?assertMatch({noreply, _}, Res), + {noreply, RState} = Res, + ?assertEqual(nil, RState#state.scanner). + +t_handle_info_scanner_crashed(_) -> + Res = handle_info({'EXIT', spid, oops}, mock_state()), + ?assertMatch({stop, {scanner_died, oops}, _State}, Res). + +t_handle_info_event_server_exited(_) -> + Res = handle_info({'DOWN', esref, type, espid, reason}, mock_state()), + ?assertMatch({stop, {couch_event_server_died, reason}, _}, Res). + +t_handle_info_unknown_pid_exited(_) -> + State0 = mock_state(), + Res0 = handle_info({'EXIT', somepid, normal}, State0), + ?assertMatch({stop, {unexpected_exit, somepid, normal}, State0}, Res0), + State1 = mock_state(), + Res1 = handle_info({'EXIT', somepid, oops}, State1), + ?assertMatch({stop, {unexpected_exit, somepid, oops}, State1}, Res1). + +t_handle_info_change_feed_exited(_) -> + Tid0 = mock_ets(), + State0 = mock_state(Tid0, cpid), + Res0 = handle_info({'EXIT', cpid, normal}, State0), + ?assertMatch({noreply, _}, Res0), + {noreply, RState0} = Res0, + ?assertEqual(#{}, RState0#state.pids), + ets:delete(Tid0), + Tid1 = mock_ets(), + State1 = mock_state(Tid1, cpid), + Res1 = handle_info({'EXIT', cpid, oops}, State1), + ?assertMatch({noreply, _}, Res1), + {noreply, RState1} = Res1, + ?assertEqual(#{}, RState1#state.pids), + ets:delete(Tid1). + +t_handle_info_change_feed_exited_and_need_rescan(_) -> + Tid = mock_ets(), + State = mock_state(Tid, cpid), + true = ets:insert(Tid, {?DBNAME, 1, true, cpid}), + Res = handle_info({'EXIT', cpid, normal}, State), + ?assertMatch({noreply, _}, Res), + {noreply, RState} = Res, + % a mock change feed process should be running + [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids), + ?assert(is_pid(Pid)), + % rescan flag should have been reset to false + ?assertEqual([{?DBNAME, 1, false, Pid}], ets:tab2list(Tid)), + ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), + ?assertEqual({self(), ?DBNAME}, ChArgs), + ets:delete(Tid). + +t_spawn_changes_reader(_) -> + Pid = start_changes_reader(?DBNAME, 3), + ?assert(erlang:is_process_alive(Pid)), + ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), + ?assertEqual({self(), ?DBNAME}, ChArgs), + ?assert(meck:validate(couch_db)), + ?assert(meck:validate(couch_changes)), + ?assert(meck:called(couch_db, open_int, [?DBNAME, [?CTX, sys_db]])), + ?assert( + meck:called(couch_changes, handle_db_changes, [ + #changes_args{ + include_docs = true, + since = 3, + feed = "normal", + timeout = infinity }, - sys:get_state(Pid) - ), - unlink(Pid), - exit(Pid, kill), - ?assert(meck:called(couch_event, register_all, [Pid])) - end). - -t_start_link_no_ddocs() -> - ?_test(begin - {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]), - ?assert(is_pid(Pid)), - ?assertMatch( - #state{ - mod = ?MOD, - suffix = ?SUFFIX, - ctx = nil, - pids = [], - skip_ddocs = true + {json_req, null}, + db + ]) + ). + +t_changes_reader_cb_change(_) -> + {ok, Pid} = start_link(?SUFFIX, ?MOD, zig, []), + Change = change_row(<<"blah">>), + ChArg = {change, Change, ignore}, + {Pid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {Pid, ?DBNAME}), + ?assert(meck:called(?MOD, db_change, [?DBNAME, Change, zig])), + unlink(Pid), + exit(Pid, kill). + +t_changes_reader_cb_stop(_) -> + {ok, ServerPid} = start_link(?SUFFIX, ?MOD, zig, []), + #state{tid = Tid} = sys:get_state(ServerPid), + ChPid = self(), + ets:insert(Tid, {?DBNAME, 1, false, ChPid}), + sys:replace_state(ServerPid, fun(#state{} = OldSt) -> + OldSt#state{pids = #{ChPid => ?DBNAME}} + end), + ChArg = {stop, 11}, + {ServerPid, ?DBNAME} = changes_reader_cb(ChArg, chtype, {ServerPid, ?DBNAME}), + % We checkpoint on stop, check if checkpointed at correct sequence + #state{tid = Tid, pids = Pids} = sys:get_state(ServerPid), + ?assertMatch(#{ChPid := ?DBNAME}, Pids), + ?assertEqual([{?DBNAME, 11, false, ChPid}], ets:tab2list(Tid)), + unlink(ServerPid), + exit(ServerPid, kill). + +t_changes_reader_cb_other(_) -> + ?assertEqual(acc, changes_reader_cb(other, chtype, acc)). + +t_handle_call_resume_scan_no_chfeed_no_ets_entry(_) -> + Tid = mock_ets(), + State = mock_state(Tid), + RState = resume_scan(?DBNAME, State), + % Check if called db_found callback + ?assert(meck:called(?MOD, db_found, [?DBNAME, zig])), + % Check if started a change reader + [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids), + % Check if inserted checkpoint entry in ets starting at 0 + ?assertEqual([{?DBNAME, 0, false, Pid}], ets:tab2list(Tid)), + ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), + ?assertEqual({self(), ?DBNAME}, ChArgs), + ?assert( + meck:called(couch_changes, handle_db_changes, [ + #changes_args{ + include_docs = true, + since = 0, + feed = "normal", + timeout = infinity + }, + {json_req, null}, + db + ]) + ), + ets:delete(Tid). + +t_handle_call_resume_scan_chfeed_no_ets_entry(_) -> + Tid = mock_ets(), + Pid = start_changes_reader(?DBNAME, 0), + State = mock_state(Tid, Pid), + resume_scan(?DBNAME, State), + % Check ets checkpoint is set to 0 and rescan = true + ?assertEqual([{?DBNAME, 0, true, Pid}], ets:tab2list(Tid)), + ets:delete(Tid), + kill_mock_changes_reader_and_get_its_args(Pid). + +t_handle_call_resume_scan_chfeed_ets_entry(_) -> + Tid = mock_ets(), + Pid = start_changes_reader(?DBNAME, 1), + State = mock_state(Tid, Pid), + true = ets:insert(Tid, [{?DBNAME, 2, false, Pid}]), + resume_scan(?DBNAME, State), + % Check ets checkpoint is set to same endseq but rescan = true + ?assertEqual([{?DBNAME, 2, true, Pid}], ets:tab2list(Tid)), + ets:delete(Tid), + kill_mock_changes_reader_and_get_its_args(Pid). + +t_handle_call_resume_scan_no_chfeed_ets_entry(_) -> + Tid = mock_ets(), + true = ets:insert(Tid, [{?DBNAME, 1, true, undefined}]), + State = mock_state(Tid), + RState = resume_scan(?DBNAME, State), + % Check if started a change reader + [{Pid, ?DBNAME}] = maps:to_list(RState#state.pids), + % Check if reset rescan to false but kept same endseq + ?assertEqual([{?DBNAME, 1, false, Pid}], ets:tab2list(Tid)), + ChArgs = kill_mock_changes_reader_and_get_its_args(Pid), + ?assertEqual({self(), ?DBNAME}, ChArgs), + ?assert( + meck:called(couch_changes, handle_db_changes, [ + #changes_args{ + include_docs = true, + since = 1, + feed = "normal", + timeout = infinity }, - sys:get_state(Pid) - ), - unlink(Pid), - exit(Pid, kill) - end). + {json_req, null}, + db + ]) + ), + ets:delete(Tid). + +t_start_link(_) -> + {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, []), + ?assert(is_pid(Pid)), + ?assertMatch( + #state{ + mod = ?MOD, + suffix = ?SUFFIX, + ctx = nil, + pids = #{}, + skip_ddocs = false + }, + sys:get_state(Pid) + ), + unlink(Pid), + exit(Pid, kill), + ?assert(meck:called(couch_event, register_all, [Pid])). + +t_start_link_no_ddocs(_) -> + {ok, Pid} = start_link(?SUFFIX, ?MOD, nil, [skip_ddocs]), + ?assert(is_pid(Pid)), + ?assertMatch( + #state{ + mod = ?MOD, + suffix = ?SUFFIX, + ctx = nil, + pids = #{}, + skip_ddocs = true + }, + sys:get_state(Pid) + ), + unlink(Pid), + exit(Pid, kill). -t_misc_gen_server_callbacks() -> - ?_test(begin - ?assertEqual(ok, terminate(reason, state)) - end). +t_misc_gen_server_callbacks(_) -> + ?assertEqual(ok, terminate(reason, state)). scan_dbs_test_() -> { @@ -734,45 +692,60 @@ scan_dbs_test_() -> fabric:delete_db(GlobalDb, [?CTX]), test_util:stop_couch(Ctx) end, - {with, [ - fun t_find_shard/1, - fun t_shard_not_found/1, - fun t_pass_local/1, - fun t_fail_local/1 - ]} + with([ + ?TDEF(t_find_shard), + ?TDEF(t_shard_not_found), + ?TDEF(t_pass_local), + ?TDEF(t_fail_local), + ?TDEF(t_scan_all_dbs) + ]) }. t_find_shard({_, DbName, _}) -> - ?_test(begin - ?assertEqual(2, length(local_shards(DbName))) - end). + ?assertEqual(2, length(local_shards(DbName))). t_shard_not_found(_) -> - ?_test(begin - ?assertEqual([], local_shards(?tempdb())) - end). + ?assertEqual([], local_shards(?tempdb())). t_pass_local({_, _, LocalDb}) -> - ?_test(begin - scan_local_db(self(), LocalDb), - receive - {'$gen_cast', Msg} -> - ?assertEqual(Msg, {resume_scan, LocalDb}) - after 0 -> - ?assert(false) - end - end). + scan_local_db(self(), LocalDb), + receive + {'$gen_cast', Msg} -> + ?assertEqual(Msg, {resume_scan, LocalDb}) + after 0 -> + ?assert(false) + end. t_fail_local({_, _, LocalDb}) -> - ?_test(begin - scan_local_db(self(), <<"some_other_db">>), - receive - {'$gen_cast', Msg} -> - ?assertNotEqual(Msg, {resume_scan, LocalDb}) - after 0 -> - ?assert(true) - end - end). + scan_local_db(self(), <<"some_other_db">>), + receive + {'$gen_cast', Msg} -> + ?assertNotEqual(Msg, {resume_scan, LocalDb}) + after 0 -> + ?assert(true) + end. + +t_scan_all_dbs({_, GlobalDb, _}) -> + scan_all_dbs(self(), GlobalDb), + ?assertMatch( + [ + {'$gen_cast', {resume_scan, <<"shards/00000000-7fffffff/", _/binary>>}}, + {'$gen_cast', {resume_scan, <<"shards/80000000-ffffffff/", _/binary>>}} + ], + lists:sort(flush([])) + ). + +flush(Acc) -> + receive + Msg -> + NewMsg = [Msg | Acc], + case length(NewMsg) >= 2 of + true -> NewMsg; + false -> flush(NewMsg) + end + after 1000 -> + Acc + end. % Test helper functions @@ -826,7 +799,7 @@ mock_state() -> suffix = ?SUFFIX, event_server = esref, scanner = spid, - pids = [] + pids = #{} }. mock_state(Ets) -> @@ -835,7 +808,8 @@ mock_state(Ets) -> mock_state(Ets, Pid) -> State = mock_state(Ets), - State#state{pids = [{?DBNAME, Pid}]}. + ets:insert(State#state.tid, {?DBNAME, 0, false, Pid}), + State#state{pids = #{Pid => ?DBNAME}}. change_row(Id) when is_binary(Id) -> {[ @@ -846,7 +820,11 @@ change_row(Id) when is_binary(Id) -> ]}. handle_call_ok(Msg, State) -> - ?assertMatch({reply, ok, _}, handle_call(Msg, from, State)). + handle_call_ok(Msg, from, State). + +handle_call_ok(Msg, FromPid, State) -> + FromTag = make_ref(), + ?assertMatch({reply, ok, _}, handle_call(Msg, {FromPid, FromTag}, State)). handle_info_check(Msg, State) -> ?assertMatch({noreply, _}, handle_info(Msg, State)).
