This is an automated email from the ASF dual-hosted git repository.
nickva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new db961c080 Fix cluster index and process cleanup
db961c080 is described below
commit db961c0809cf8e201824ff0fbaa952f0a3fc72e2
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Apr 27 21:53:37 2026 -0400
Fix cluster index and process cleanup
Previously, as described in #5980 we didn't perform a thorough index cleanup
when ddocs changed. We only cleaned up on nodes where the design docs were
located. That was true for a n=3 db and an n=3, db but may not be true in
general in a cluster.
To fix the issue, run a small gen_server responsible performing cluster
index cleanup. To avoid spawning Q*N jobs, deduplicate the requests by
delaying
for up to 30 seconds per clustered db. For cleanup reuse and call the
already
existing fabric index file cleanup machinery. That accomplishes two things:
- Starts a quicker index file cleanup. Previously we only did this during
smoosh compaction runs. The view files could linger for a while until
compaction in smoosh would be triggered.
- Cleaning search index files also stops indexes on their (Java) side, so
index file clean-up does "double duty" so speak when it comes to index
shut
down.
Fix https://github.com/apache/couchdb/issues/5980
---
rel/overlay/etc/default.ini | 4 +
src/couch/src/couch_secondary_sup.erl | 3 +-
src/couch_index/src/couch_index_cleanup.erl | 98 ++++++++
src/couch_index/src/couch_index_server.erl | 68 ++----
.../test/eunit/couch_index_ddoc_updated_tests.erl | 178 ---------------
src/couch_mrview/src/couch_mrview_cleanup.erl | 40 +++-
src/couch_mrview/src/couch_mrview_util.erl | 12 +-
.../test/eunit/couch_mrview_cleanup_tests.erl | 252 +++++++++++++++++++++
.../test/eunit/couch_mrview_util_tests.erl | 6 +-
9 files changed, 427 insertions(+), 234 deletions(-)
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 17b5eee48..79026dea3 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -158,6 +158,10 @@ view_index_dir = {{view_index_dir}}
;
;time_seq_min_time = 1754006400
+; Clustered index cleanup deduplication hold-off. How long to wait before
+; running clean up per clustered db.
+;index_cleanup_delay_msec = 30000
+
[bt_engine_cache]
; Memory used for btree engine cache. This is a cache for top levels of
; database btrees (id tree, seq tree) and a few terms from the db header. Value
diff --git a/src/couch/src/couch_secondary_sup.erl
b/src/couch/src/couch_secondary_sup.erl
index 766235d5d..7ee9cab78 100644
--- a/src/couch/src/couch_secondary_sup.erl
+++ b/src/couch/src/couch_secondary_sup.erl
@@ -28,7 +28,8 @@ init([]) ->
{query_servers, {couch_proc_manager, start_link, []}},
{vhosts, {couch_httpd_vhost, start_link, []}},
{uuids, {couch_uuids, start, []}},
- {disk_manager, {couch_disk_monitor, start_link, []}}
+ {disk_manager, {couch_disk_monitor, start_link, []}},
+ {couch_index_cleanup, {couch_index_cleanup, start_link, []}}
] ++ couch_index_servers(),
MaybeHttp =
diff --git a/src/couch_index/src/couch_index_cleanup.erl
b/src/couch_index/src/couch_index_cleanup.erl
new file mode 100644
index 000000000..be37c5654
--- /dev/null
+++ b/src/couch_index/src/couch_index_cleanup.erl
@@ -0,0 +1,98 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_index_cleanup).
+-behaviour(gen_server).
+
+-export([
+ start_link/0,
+ schedule/1
+]).
+
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2
+]).
+
+-export([
+ handle_db_event/3
+]).
+
+-define(DEFAULT_DELAY_MSEC, 30000).
+
+-record(st, {
+ pending = #{} :: #{binary() => reference()}
+}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+schedule(DbName) when is_binary(DbName) ->
+ gen_server:cast(?MODULE, {schedule, DbName, fanout}).
+
+init([]) ->
+ {ok, _} = couch_event:link_listener(?MODULE, handle_db_event, nil,
[all_dbs]),
+ {ok, #st{}}.
+
+handle_call(Msg, _From, #st{} = St) ->
+ {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}.
+
+handle_cast({schedule, DbName, Mode}, #st{pending = Pending} = St) ->
+ case maps:is_key(DbName, Pending) of
+ true ->
+ {noreply, St};
+ false ->
+ case Mode of
+ fanout -> fanout(DbName);
+ no_fanout -> ok
+ end,
+ TRef = erlang:send_after(delay_msec(), self(), {run_cleanup,
DbName}),
+ {noreply, St#st{pending = Pending#{DbName => TRef}}}
+ end;
+handle_cast(Msg, St) ->
+ {stop, {invalid_cast, Msg}, St}.
+
+handle_info({run_cleanup, DbName}, #st{pending = Pending} = St) ->
+ spawn(fun() ->
+ try
+ fabric:cleanup_index_files_this_node(DbName)
+ catch
+ Class:Reason:Stack ->
+ WArgs = [?MODULE, DbName, Class, Reason, Stack],
+ couch_log:warning("~p: cleanup ~s failed ~p:~p~n~p", WArgs)
+ end
+ end),
+ {noreply, St#st{pending = maps:remove(DbName, Pending)}};
+handle_info(Msg, St) ->
+ {stop, {invalid_info, Msg}, St}.
+
+handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, _DDocId}, St)
->
+ % Clustered dbs only
+ schedule(mem3:dbname(DbName)),
+ {ok, St};
+handle_db_event(_DbName, _Event, St) ->
+ {ok, St}.
+
+fanout(DbName) ->
+ try mem3:shards(DbName) of
+ Shards ->
+ Nodes = lists:usort([mem3:node(S) || S <- Shards]) -- [node()],
+ Args = {schedule, DbName, no_fanout},
+ lists:foreach(fun(N) -> gen_server:cast({?MODULE, N}, Args) end,
Nodes)
+ catch
+ _:_ -> ok
+ end.
+
+delay_msec() ->
+ config:get_integer("couchdb", "index_cleanup_delay_msec",
?DEFAULT_DELAY_MSEC).
diff --git a/src/couch_index/src/couch_index_server.erl
b/src/couch_index/src/couch_index_server.erl
index d4593ee0d..82ceb1541 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -23,6 +23,9 @@
-export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1,
openers/1]).
-export([aggregate_queue_len/0, names/0]).
+% Cluster cleanup helpers (used by couch_mrview_cleanup)
+-export([shard_entries/1, shard_index_pid/2, forget_ddoc_binding/3]).
+
% Exported for callbacks
-export([
handle_config_change/5,
@@ -358,51 +361,9 @@ handle_db_event(DbName, created, St) ->
handle_db_event(DbName, deleted, St) ->
gen_server:cast(St#st.server_name, {reset_indexes, DbName}),
{ok, St};
-handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St)
->
- %% this handle_db_event function must not crash (or it takes down the
couch_index_server)
- try
- DDocResult = couch_util:with_db(DbName, fun(Db) ->
- couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX])
- end),
- LocalShards = mem3:local_shards(mem3:dbname(DbName)),
- DbShards = [mem3:name(Sh) || Sh <- LocalShards],
- lists:foreach(
- fun(DbShard) ->
- lists:foreach(
- fun({_DbShard, {_DDocId, Sig}}) ->
- % check if there are other ddocs with the same Sig for
the same db
- SigDDocs = ets:match_object(St#st.by_db, {DbShard,
{'$1', Sig}}),
- if
- length(SigDDocs) > 1 ->
- % remove records from by_db for this DDoc
- Args = [DbShard, DDocId, Sig],
- gen_server:cast(St#st.server_name,
{rem_from_ets, Args});
- true ->
- % single DDoc with this Sig - close
couch_index processes
- case ets:lookup(St#st.by_sig, {DbShard, Sig})
of
- [{_, IndexPid}] ->
- (catch gen_server:cast(
- IndexPid, {ddoc_updated,
DDocResult}
- ));
- [] ->
- []
- end
- end
- end,
- ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}})
- )
- end,
- DbShards
- ),
- {ok, St}
- catch
- Class:Reason:Stack ->
- couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p,
stack ~p", [
- ?MODULE, Class, DbName, Reason, Stack
- ]),
- gen_server:cast(St#st.server_name, {rem_from_ets, [DbName,
Reason]}),
- {ok, St}
- end;
+handle_db_event(<<"shards/", _/binary>>, {ddoc_updated, _DDocId}, St) ->
+ %% Cluster dbs cleanup is handled by couch_index_cleanup
+ {ok, St};
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
lists:foreach(
fun({_DbName, {_DDocId, Sig}}) ->
@@ -437,6 +398,23 @@ by_db(Arg) ->
openers(Arg) ->
name("couchdb_indexes_openers", Arg).
+% Return {DDocId, Sig} entries for a shard. Used by cluster cleanup
+shard_entries(ShardName) when is_binary(ShardName) ->
+ Rows = ets:match_object(by_db(ShardName), {ShardName, '_'}),
+ [Entry || {_ShardName, Entry} <- Rows].
+
+% Return indexer Pid for {ShardName, Sig} or not_found
+shard_index_pid(ShardName, Sig) when is_binary(ShardName) ->
+ case ets:lookup(by_sig(ShardName), {ShardName, Sig}) of
+ [{_, Pid}] when is_pid(Pid) -> {ok, Pid};
+ _ -> not_found
+ end.
+
+% Remove {ShardName, {DDocId, Sig}} row from by_db. The indexer process is left
+% as is. This is for removing one of the ddocs pointing to the same sig
+forget_ddoc_binding(ShardName, DDocId, Sig) when is_binary(ShardName) ->
+ gen_server:cast(server_name(ShardName), {rem_from_ets, [ShardName, DDocId,
Sig]}).
+
name(BaseName, Arg) when is_list(Arg) ->
name(BaseName, ?l2b(Arg));
name(BaseName, Arg) when is_binary(Arg) ->
diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
deleted file mode 100644
index 6b7fe5a4a..000000000
--- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl
+++ /dev/null
@@ -1,178 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_index_ddoc_updated_tests).
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
-
-start() ->
- fake_index(),
- Ctx = test_util:start_couch([mem3, fabric]),
- DbName = ?tempdb(),
- ok = fabric:create_db(DbName, [?ADMIN_CTX]),
- {Ctx, DbName}.
-
-stop({Ctx, DbName}) ->
- meck:unload(test_index),
- ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
- DbDir = config:get("couchdb", "database_dir", "."),
- WaitFun = fun() ->
- filelib:fold_files(
- DbDir,
- <<".*", DbName/binary, "\.[0-9]+.*">>,
- true,
- fun(_F, _A) -> wait end,
- ok
- )
- end,
- ok = test_util:wait(WaitFun),
- test_util:stop_couch(Ctx),
- ok.
-
-ddoc_update_test_() ->
- {
- "Check ddoc update actions",
- {
- setup,
- fun start/0,
- fun stop/1,
- fun check_all_indexers_exit_on_ddoc_change/1
- }
- }.
-
-check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) ->
- ?_test(begin
- [DbShard1 | RestDbShards] = lists:map(
- fun(Sh) ->
- {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
- ShardDb
- end,
- mem3:local_shards(mem3:dbname(DbName))
- ),
-
- % create a DDoc on Db1
- DDocID = <<"idx_name">>,
- DDocJson = couch_doc:from_json_obj(
- {[
- {<<"_id">>, DDocID},
- {<<"value">>, 1}
- ]}
- ),
- {ok, _Rev} = couch_db:update_doc(DbShard1, DDocJson, []),
- {ok, DbShard} = couch_db:reopen(DbShard1),
- {ok, DDoc} = couch_db:open_doc(
- DbShard, DDocID, [ejson_body, ?ADMIN_CTX]
- ),
- DbShards = [DbShard | RestDbShards],
- N = length(DbShards),
-
- % run couch_index process for each shard database
- ok = meck:reset(test_index),
- lists:foreach(
- fun(ShardDb) ->
- couch_index_server:get_index(test_index, ShardDb, DDoc)
- end,
- DbShards
- ),
-
- IndexesBefore = get_indexes_by_ddoc(DDocID, N),
- ?assertEqual(N, length(IndexesBefore)),
-
- AliveBefore = lists:filter(fun is_process_alive/1, IndexesBefore),
- ?assertEqual(N, length(AliveBefore)),
-
- % update ddoc
- DDocJson2 = couch_doc:from_json_obj(
- {[
- {<<"_id">>, DDocID},
- {<<"value">>, 2},
- {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)}
- ]}
- ),
- {ok, _} = couch_db:update_doc(DbShard, DDocJson2, []),
-
- % assert that all index processes exit after ddoc updated
- ok = meck:reset(test_index),
- lists:foreach(
- fun(I) ->
- couch_index_server:handle_db_event(
- couch_db:name(DbShard),
- {ddoc_updated, DDocID},
- {st, "", couch_index_server:server_name(I),
couch_index_server:by_sig(I),
- couch_index_server:by_pid(I),
couch_index_server:by_db(I),
- couch_index_server:openers(I)}
- )
- end,
- seq()
- ),
-
- ok = meck:wait(N, test_index, init, ['_', '_'], 5000),
- IndexesAfter = get_indexes_by_ddoc(DDocID, 0),
- ?assertEqual(0, length(IndexesAfter)),
-
- %% assert that previously running indexes are gone
- AliveAfter = lists:filter(fun is_process_alive/1, IndexesBefore),
- ?assertEqual(0, length(AliveAfter)),
- ok
- end).
-
-fake_index() ->
- ok = meck:new([test_index], [non_strict]),
- ok = meck:expect(test_index, init, fun(Db, DDoc) ->
- {ok, {couch_db:name(Db), DDoc}}
- end),
- ok = meck:expect(test_index, open, fun(_Db, State) ->
- {ok, State}
- end),
- ok = meck:expect(test_index, get, fun
- (db_name, {DbName, _DDoc}) ->
- DbName;
- (idx_name, {_DbName, DDoc}) ->
- DDoc#doc.id;
- (signature, {_DbName, DDoc}) ->
- couch_hash:md5_hash(term_to_binary(DDoc));
- (update_seq, Seq) ->
- Seq
- end),
- ok = meck:expect(test_index, shutdown, ['_'], ok).
-
-get_indexes_by_ddoc(DDocID, N) ->
- Indexes = test_util:wait(fun() ->
- Indxs = lists:flatmap(
- fun(I) ->
- ets:match_object(
- couch_index_server:by_db(I), {'$1', {DDocID, '$2'}}
- )
- end,
- seq()
- ),
- case length(Indxs) == N of
- true ->
- Indxs;
- false ->
- wait
- end
- end),
- lists:foldl(
- fun({DbName, {_DDocID, Sig}}, Acc) ->
- case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig})
of
- [{_, Pid}] -> [Pid | Acc];
- _ -> Acc
- end
- end,
- [],
- Indexes
- ).
-
-seq() ->
- lists:seq(1, couch_index_server:num_servers()).
diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl
b/src/couch_mrview/src/couch_mrview_cleanup.erl
index e8a2833a7..fab449ee5 100644
--- a/src/couch_mrview/src/couch_mrview_cleanup.erl
+++ b/src/couch_mrview/src/couch_mrview_cleanup.erl
@@ -14,7 +14,8 @@
-export([
run/1,
- cleanup/2
+ cleanup/2,
+ cleanup_processes/2
]).
run(Db) ->
@@ -23,7 +24,8 @@ run(Db) ->
{ok, Db1} = couch_db:reopen(Db),
Sigs = couch_mrview_util:get_signatures(Db1),
ok = cleanup_purges(Db1, Sigs, Checkpoints),
- ok = cleanup_indices(Sigs, Indices).
+ ok = cleanup_indices(Sigs, Indices),
+ ok = cleanup_processes(Db1, Sigs).
% erpc endpoint for fabric_index_cleanup:cleanup_indexes/2
%
@@ -34,7 +36,8 @@ cleanup(Dbs, #{} = Sigs) ->
Indices = couch_mrview_util:get_index_files(Db),
Checkpoints = couch_mrview_util:get_purge_checkpoints(Db),
ok = cleanup_purges(Db, Sigs, Checkpoints),
- ok = cleanup_indices(Sigs, Indices)
+ ok = cleanup_indices(Sigs, Indices),
+ ok = cleanup_processes(Db, Sigs)
end,
Dbs
)
@@ -43,6 +46,37 @@ cleanup(Dbs, #{} = Sigs) ->
ok
end.
+% Clean up indexer processes whose signature is no longer in the valid set.
+%
+cleanup_processes(ShardName, #{} = Sigs) when is_binary(ShardName) ->
+ Entries = couch_index_server:shard_entries(ShardName),
+ lists:foreach(
+ fun({DDocId, Sig}) ->
+ HexSig = couch_util:to_hex_bin(Sig),
+ case maps:find(HexSig, Sigs) of
+ {ok, ValidDDocs} ->
+ % Sig in use. If DDocId doesn't reference it any
+ % longer drop the stale by_db row
+ case maps:is_key(DDocId, ValidDDocs) of
+ true ->
+ ok;
+ false ->
+ couch_index_server:forget_ddoc_binding(ShardName,
DDocId, Sig)
+ end;
+ error ->
+ case couch_index_server:shard_index_pid(ShardName, Sig) of
+ {ok, IndexPid} ->
+ (catch gen_server:cast(IndexPid, {ddoc_updated,
{not_found, deleted}}));
+ not_found ->
+ ok
+ end
+ end
+ end,
+ Entries
+ );
+cleanup_processes(Db, #{} = Sigs) ->
+ cleanup_processes(couch_db:name(Db), Sigs).
+
cleanup_purges(Db, Sigs, Checkpoints) ->
couch_index_util:cleanup_purges(Db, Sigs, Checkpoints).
diff --git a/src/couch_mrview/src/couch_mrview_util.erl
b/src/couch_mrview/src/couch_mrview_util.erl
index 48138bf76..17f6db34d 100644
--- a/src/couch_mrview/src/couch_mrview_util.erl
+++ b/src/couch_mrview/src/couch_mrview_util.erl
@@ -112,17 +112,19 @@ get_signatures(Db) ->
DDocs1 = lists:foldl(FoldFun, [], DDocs),
get_signatures_from_ddocs(DbName, DDocs1).
-% From a list of design #doc{} records returns signatures map: #{Sig => true}
-% This will be valid signatures of views we expect to run and build on this
-% node.
+% From a list of design #doc{} records returns the map
+% #{Sig => #{DDocId => true}}. The keys are the valid sig of views
+% and inner maps are ddocs referencing those sigs (we can have multiple
+% ddocs referencing the same sig).
get_signatures_from_ddocs(DbName, DDocs) when is_list(DDocs) ->
- FoldFun = fun(#doc{} = Doc, Acc) ->
+ FoldFun = fun(#doc{id = DDocId} = Doc, Acc) ->
try ddoc_to_mrst(DbName, Doc) of
{ok, Mrst} ->
case couch_mrview_util:mrst_has_valid_views(Mrst) of
true ->
Sig = couch_util:to_hex_bin(Mrst#mrst.sig),
- Acc#{Sig => true};
+ Inner = maps:get(Sig, Acc, #{}),
+ Acc#{Sig => Inner#{DDocId => true}};
false ->
Acc
end
diff --git a/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl
b/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl
new file mode 100644
index 000000000..52aee1332
--- /dev/null
+++ b/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl
@@ -0,0 +1,252 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_mrview_cleanup_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(TEST_INDEX, test_index).
+-define(DDOC_ID, <<"idx_name">>).
+
+start() ->
+ fake_index(),
+ Ctx = test_util:start_couch([mem3, fabric]),
+ config:set("couchdb", "index_cleanup_delay_msec", "60000", false),
+ DbName = ?tempdb(),
+ ok = fabric:create_db(DbName, [?ADMIN_CTX]),
+ {Ctx, DbName}.
+
+stop({Ctx, DbName}) ->
+ meck:unload(?TEST_INDEX),
+ ok = fabric:delete_db(DbName, [?ADMIN_CTX]),
+ DbDir = config:get("couchdb", "database_dir", "."),
+ WaitFun = fun() ->
+ filelib:fold_files(
+ DbDir,
+ <<".*", DbName/binary, "\.[0-9]+.*">>,
+ true,
+ fun(_F, _A) -> wait end,
+ ok
+ )
+ end,
+ ok = test_util:wait(WaitFun),
+ config:delete("couchdb", "index_cleanup_delay_msec", false),
+ test_util:stop_couch(Ctx),
+ ok.
+
+cleanup_test_() ->
+ {
+ "couch_mrview_cleanup",
+ {
+ foreach,
+ fun start/0,
+ fun stop/1,
+ [
+ ?TDEF_FE(t_orphan_sigs_are_reaped),
+ ?TDEF_FE(t_valid_sigs_survive),
+ ?TDEF_FE(t_shared_sig_drops_stale_ddoc_row),
+ ?TDEF_FE(t_schedule_dedupes_within_window)
+ ]
+ }
+ }.
+
+% Sig is now invalid, reap it!
+t_orphan_sigs_are_reaped({_Ctx, DbName}) ->
+ [DbShard1 | RestDbShards] = open_shards(DbName),
+ {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID),
+ DbShards = [DbShard | RestDbShards],
+ N = length(DbShards),
+ spawn_indexers(DbShards, DDoc),
+ IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N),
+ ?assertEqual(N, length(IndexesBefore)),
+
+ ok = meck:reset(?TEST_INDEX),
+ lists:foreach(
+ fun(DbShardName) ->
+ couch_mrview_cleanup:cleanup_processes(DbShardName, #{})
+ end,
+ [couch_db:name(S) || S <- DbShards]
+ ),
+
+ wait_until_dead(IndexesBefore),
+ ?assertEqual(0, length(get_indexes_by_ddoc(?DDOC_ID, 0))),
+ ?assertEqual(0, length(lists:filter(fun is_process_alive/1,
IndexesBefore))).
+
+% Sig is valid, leave it alone
+t_valid_sigs_survive({_Ctx, DbName}) ->
+ [DbShard1 | RestDbShards] = open_shards(DbName),
+ {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID),
+ DbShards = [DbShard | RestDbShards],
+ N = length(DbShards),
+ spawn_indexers(DbShards, DDoc),
+ IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N),
+ ?assertEqual(N, length(IndexesBefore)),
+
+ % All test indexes share the same sig (as mocked)
+ [{_, {_, RawSig}} | _] = ets:match_object(
+ couch_index_server:by_db(couch_db:name(DbShard)),
+ {couch_db:name(DbShard), {?DDOC_ID, '$1'}}
+ ),
+ ValidSigs = #{couch_util:to_hex_bin(RawSig) => #{?DDOC_ID => true}},
+
+ lists:foreach(
+ fun(DbShardName) ->
+ couch_mrview_cleanup:cleanup_processes(DbShardName, ValidSigs)
+ end,
+ [couch_db:name(S) || S <- DbShards]
+ ),
+
+ timer:sleep(100),
+ ?assertEqual(N, length(get_indexes_by_ddoc(?DDOC_ID, N))),
+ ?assertEqual(N, length(lists:filter(fun is_process_alive/1,
IndexesBefore))).
+
+% Two ddocs share the same sig. Indexer has to stay alive old ddoc removed
+t_shared_sig_drops_stale_ddoc_row({_Ctx, DbName}) ->
+ [DbShard1 | RestDbShards] = open_shards(DbName),
+ {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID),
+ DbShards = [DbShard | RestDbShards],
+ N = length(DbShards),
+ spawn_indexers(DbShards, DDoc),
+ IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N),
+ ?assertEqual(N, length(IndexesBefore)),
+
+ [{_, {_, RawSig}} | _] = ets:match_object(
+ couch_index_server:by_db(couch_db:name(DbShard)),
+ {couch_db:name(DbShard), {?DDOC_ID, '$1'}}
+ ),
+ OtherDDocId = <<"some_other_ddoc">>,
+ ValidSigs = #{couch_util:to_hex_bin(RawSig) => #{OtherDDocId => true}},
+
+ lists:foreach(
+ fun(DbShardName) ->
+ couch_mrview_cleanup:cleanup_processes(DbShardName, ValidSigs)
+ end,
+ [couch_db:name(S) || S <- DbShards]
+ ),
+
+ test_util:wait(fun() ->
+ Stale = lists:flatmap(
+ fun(I) ->
+ ets:match_object(
+ couch_index_server:by_db(I), {'$1', {?DDOC_ID, '$2'}}
+ )
+ end,
+ seq()
+ ),
+ case Stale of
+ [] -> ok;
+ _ -> wait
+ end
+ end),
+ ?assertEqual(N, length(lists:filter(fun is_process_alive/1,
IndexesBefore))).
+
+% Three schedule calls should dedup
+t_schedule_dedupes_within_window({_Ctx, DbName}) ->
+ ClusteredDbName = mem3:dbname(DbName),
+ ok = couch_index_cleanup:schedule(ClusteredDbName),
+ ok = couch_index_cleanup:schedule(ClusteredDbName),
+ ok = couch_index_cleanup:schedule(ClusteredDbName),
+ ?assertEqual([ClusteredDbName], pending_dbnames()).
+
+% Helpers. Some copied from other tests
+
+open_shards(DbName) ->
+ lists:map(
+ fun(Sh) ->
+ {ok, ShardDb} = couch_db:open(mem3:name(Sh), []),
+ ShardDb
+ end,
+ mem3:local_shards(mem3:dbname(DbName))
+ ).
+
+create_ddoc(Db, DDocID) ->
+ DDocJson = couch_doc:from_json_obj(
+ {[
+ {<<"_id">>, DDocID},
+ {<<"value">>, 1}
+ ]}
+ ),
+ {ok, _Rev} = couch_db:update_doc(Db, DDocJson, []),
+ {ok, Db1} = couch_db:reopen(Db),
+ {ok, DDoc} = couch_db:open_doc(Db1, DDocID, [ejson_body, ?ADMIN_CTX]),
+ {DDoc, Db1}.
+
+spawn_indexers(DbShards, DDoc) ->
+ ok = meck:reset(?TEST_INDEX),
+ lists:foreach(
+ fun(ShardDb) ->
+ couch_index_server:get_index(?TEST_INDEX, ShardDb, DDoc)
+ end,
+ DbShards
+ ).
+
+fake_index() ->
+ ok = meck:new([?TEST_INDEX], [non_strict]),
+ ok = meck:expect(?TEST_INDEX, init, fun(Db, DDoc) ->
+ {ok, {couch_db:name(Db), DDoc}}
+ end),
+ ok = meck:expect(?TEST_INDEX, open, fun(_Db, State) ->
+ {ok, State}
+ end),
+ ok = meck:expect(?TEST_INDEX, get, fun
+ (db_name, {DbName, _DDoc}) ->
+ DbName;
+ (idx_name, {_DbName, DDoc}) ->
+ DDoc#doc.id;
+ (signature, {_DbName, DDoc}) ->
+ couch_hash:md5_hash(term_to_binary(DDoc));
+ (update_seq, Seq) ->
+ Seq
+ end),
+ ok = meck:expect(?TEST_INDEX, shutdown, ['_'], ok).
+
+get_indexes_by_ddoc(DDocID, N) ->
+ Indexes = test_util:wait(fun() ->
+ Indxs = lists:flatmap(
+ fun(I) ->
+ ets:match_object(
+ couch_index_server:by_db(I), {'$1', {DDocID, '$2'}}
+ )
+ end,
+ seq()
+ ),
+ case length(Indxs) == N of
+ true -> Indxs;
+ false -> wait
+ end
+ end),
+ lists:foldl(
+ fun({DbName, {_DDocID, Sig}}, Acc) ->
+ case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig})
of
+ [{_, Pid}] -> [Pid | Acc];
+ _ -> Acc
+ end
+ end,
+ [],
+ Indexes
+ ).
+
+wait_until_dead(Pids) ->
+ test_util:wait(fun() ->
+ case lists:filter(fun is_process_alive/1, Pids) of
+ [] -> ok;
+ _ -> wait
+ end
+ end).
+
+pending_dbnames() ->
+ {st, Pending} = sys:get_state(couch_index_cleanup),
+ lists:sort(maps:keys(Pending)).
+
+seq() ->
+ lists:seq(1, couch_index_server:num_servers()).
diff --git a/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
b/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
index c304dcdad..75614b728 100644
--- a/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
+++ b/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl
@@ -98,7 +98,8 @@ t_get_signatures_local({_, Db}) ->
Sigs = couch_mrview_util:get_signatures(DbName),
?assert(is_map(Sigs)),
?assertEqual(1, map_size(Sigs)),
- [{Sig, true}] = maps:to_list(Sigs),
+ [{Sig, DDocs}] = maps:to_list(Sigs),
+ ?assertEqual(#{?DDOC_ID => true}, DDocs),
{ok, Info} = couch_mrview:get_info(Db, ?DDOC_ID),
?assertEqual(proplists:get_value(signature, Info), Sig),
@@ -115,7 +116,8 @@ t_get_signatures_clustered({DbName, _Db}) ->
?assertEqual(Sigs, couch_mrview_util:get_signatures(ShardName2)),
?assert(is_map(Sigs)),
?assertEqual(1, map_size(Sigs)),
- [{Sig, true}] = maps:to_list(Sigs),
+ [{Sig, DDocs}] = maps:to_list(Sigs),
+ ?assertEqual(#{?DDOC_ID => true}, DDocs),
{ok, Info} = couch_mrview:get_info(ShardName1, ?DDOC_ID),
?assertEqual(proplists:get_value(signature, Info), Sig),