This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push:
new d86847d8a Make sure to reply to couch_index_server clients
d86847d8a is described below
commit d86847d8a6087d7b6c7979987856ff9241b8ae11
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Dec 14 16:03:04 2023 -0500
Make sure to reply to couch_index_server clients
Previously we didin't always reply to waiting `get_index` calls as
sometimes we
clobbered or deleted waiters in by_sig ets table.
Ref: https://github.com/apache/couchdb/pull/4491
---
src/couch_index/src/couch_index_server.erl | 60 ++++---
.../test/eunit/couch_index_crash_tests.erl | 188 +++++++++++++++++++++
2 files changed, 228 insertions(+), 20 deletions(-)
diff --git a/src/couch_index/src/couch_index_server.erl
b/src/couch_index/src/couch_index_server.erl
index 549f8a226..973b06337 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -171,9 +171,8 @@ handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}},
{OpenerPid, _} = Fro
unlink(OpenerPid),
ets:delete(State#st.openers, OpenerPid),
gen_server:reply(FromOpener, ok),
- [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+ ok = reply_to_waiters(DbName, Sig, {ok, Pid}, State),
add_to_ets(DbName, Sig, DDocId, Pid, State),
- [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
% Flush opener exit messages in case it died before we unlinked it
ok = flush_exit_messages_from(OpenerPid),
{noreply, State};
@@ -182,9 +181,8 @@ handle_call({async_error, {DbName, _DDocId, Sig}, Error},
{OpenerPid, _} = FromO
unlink(OpenerPid),
ets:delete(State#st.openers, OpenerPid),
gen_server:reply(FromOpener, ok),
- [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+ ok = reply_to_waiters(DbName, Sig, Error, State),
ets:delete(State#st.by_sig, {DbName, Sig}),
- [gen_server:reply(From, Error) || From <- Waiters],
% Flush opener exit messages in case it died before we unlinked it
ok = flush_exit_messages_from(OpenerPid),
{noreply, State};
@@ -207,8 +205,8 @@ handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]},
State) ->
handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
{noreply, State};
-handle_cast({rem_from_ets, [DbName]}, State) ->
- rem_from_ets(DbName, State),
+handle_cast({rem_from_ets, [DbName, Reason]}, State) ->
+ rem_from_ets_with_reply(DbName, Reason, State),
{noreply, State}.
handle_info({'EXIT', Pid, Reason}, Server) ->
@@ -218,10 +216,11 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
|| {_, {DDocId, _}} <-
ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
],
- rem_from_ets(DbName, Sig, DDocIds, Pid, Server)
+ rem_from_ets_with_reply(DbName, Sig, DDocIds, Reason, Server)
end,
case ets:lookup(Server#st.by_pid, Pid) of
[{Pid, {DbName, Sig}}] ->
+ ets:delete(Server#st.by_pid, Pid),
Cleanup(DbName, Sig);
[] when Reason /= normal ->
case ets:lookup(Server#st.openers, Pid) of
@@ -285,11 +284,16 @@ reset_indexes(DbName, #st{} = State) ->
ets:lookup(State#st.by_db, DbName)
),
Fun = fun({Sig, DDocIds}) ->
- [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
- unlink(Pid),
- gen_server:cast(Pid, delete),
- ok = flush_exit_messages_from(Pid),
- rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+ case ets:lookup(State#st.by_sig, {DbName, Sig}) of
+ [{_, Pid}] when is_pid(Pid) ->
+ unlink(Pid),
+ gen_server:cast(Pid, delete),
+ ok = flush_exit_messages_from(Pid),
+ ets:delete(State#st.by_pid, Pid);
+ _ ->
+ ok
+ end,
+ rem_from_ets_with_reply(DbName, Sig, DDocIds, {error, index_reset},
State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)),
% We only need one of the index servers to do this.
@@ -301,14 +305,25 @@ reset_indexes(DbName, #st{} = State) ->
ok
end.
+reply_to_waiters(DbName, Sig, Reply, St = #st{}) ->
+ case ets:lookup(St#st.by_sig, {DbName, Sig}) of
+ [{_, Waiters}] when is_list(Waiters) ->
+ [gen_server:reply(From, Reply) || From <- Waiters];
+ [{_, Pid}] when is_pid(Pid) ->
+ [];
+ [] ->
+ []
+ end,
+ ok.
+
add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
-rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
+rem_from_ets_with_reply(DbName, Sig, DDocIds, Reply, #st{} = St) ->
+ ok = reply_to_waiters(DbName, Sig, Reply, St),
ets:delete(St#st.by_sig, {DbName, Sig}),
- ets:delete(St#st.by_pid, Pid),
lists:foreach(
fun(DDocId) ->
ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
@@ -316,7 +331,7 @@ rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
DDocIds
).
-rem_from_ets(DbName, #st{} = State) ->
+rem_from_ets_with_reply(DbName, Reply, #st{} = State) ->
SigDDocIds = lists:foldl(
fun({_, {DDocId, Sig}}, DDict) ->
dict:append(Sig, DDocId, DDict)
@@ -325,10 +340,15 @@ rem_from_ets(DbName, #st{} = State) ->
ets:lookup(State#st.by_db, DbName)
),
Fun = fun({Sig, DDocIds}) ->
- [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
- unlink(Pid),
- ok = flush_exit_messages_from(Pid),
- rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+ case ets:lookup(State#st.by_sig, {DbName, Sig}) of
+ [{_, Pid}] when is_pid(Pid) ->
+ unlink(Pid),
+ ok = flush_exit_messages_from(Pid),
+ ets:delete(State#st.by_pid, Pid);
+ _ ->
+ ok
+ end,
+ rem_from_ets_with_reply(DbName, Sig, DDocIds, Reply, State)
end,
lists:foreach(Fun, dict:to_list(SigDDocIds)).
@@ -380,7 +400,7 @@ handle_db_event(<<"shards/", _/binary>> = DbName,
{ddoc_updated, DDocId}, St) ->
couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p,
stack ~p", [
?MODULE, Class, DbName, Reason, Stack
]),
- gen_server:cast(St#st.server_name, {rem_from_ets, [DbName]}),
+ gen_server:cast(St#st.server_name, {rem_from_ets, [DbName,
Reason]}),
{ok, St}
end;
handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
diff --git a/src/couch_index/test/eunit/couch_index_crash_tests.erl
b/src/couch_index/test/eunit/couch_index_crash_tests.erl
index 88f8dc845..c684ecdaa 100644
--- a/src/couch_index/test/eunit/couch_index_crash_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_crash_tests.erl
@@ -78,6 +78,21 @@ index_crash_test_() ->
}
}.
+index_crash_test_with_client_waiting_test_() ->
+ {
+ "Simulate index crashing while clients are waiting",
+ {
+ foreach,
+ fun start/0,
+ fun stop/1,
+ [
+ ?TDEF_FE(t_index_open_with_clients_waiting),
+ ?TDEF_FE(t_index_open_crashes_with_client_waiting),
+ ?TDEF_FE(t_index_open_returns_error_with_client_waiting)
+ ]
+ }
+ }.
+
t_can_open_mock_index({_Ctx, DbName}) ->
failing_index(dontfail),
@@ -173,6 +188,142 @@ t_index_process_dies({_Ctx, DbName}) ->
ServerPids2 = lists:sort([whereis(N) || N <- couch_index_server:names()]),
?assertEqual(ServerPids, ServerPids2).
+t_index_open_with_clients_waiting({_Ctx, DbName}) ->
+ failing_index({blockopen, self()}),
+
+ [DbShard1] = open_shards(DbName),
+
+ % create a DDoc on Db1
+ {DDoc, DbShard} = create_ddoc(DbShard1, <<"idx_name">>),
+
+ meck:reset(couch_index_server),
+ {_, CRef1} = spawn_client(DbShard, DDoc),
+ BlockPid =
+ receive
+ {blockopen, Pid} -> Pid
+ end,
+ {_, CRef2} = spawn_client(DbShard, DDoc),
+
+ % Clients are waiting for the response
+ receive
+ {'DOWN', CRef1, _, _, _} ->
+ ?assert(false, "should not have received a response yet");
+ {'DOWN', CRef2, _, _, _} ->
+ ?assert(false, "should not have received a response yet")
+ after 500 ->
+ ?assert(true)
+ end,
+
+ BlockPid ! continue,
+
+ {ok, IdxPid1} = wait_client_or_fail(CRef1),
+ {ok, IdxPid2} = wait_client_or_fail(CRef2),
+ ?assert(is_pid(IdxPid1)),
+ ?assert(is_pid(IdxPid2)),
+ ?assertEqual(IdxPid1, IdxPid2),
+
+ ?assertEqual(0, couch_index_server:aggregate_queue_len()),
+
+ %% assert opener ets table is empty
+ lists:foreach(fun(I) -> ?assertEqual([], openers(I)) end, seq()),
+
+ ?assert(meck:called(couch_index_server, handle_call, [{async_open, '_',
'_'}, '_', '_'])),
+
+ % here we should get the pid from the ets table
+ ?assertMatch({ok, _}, get_index(DbShard, DDoc)),
+ {ok, IdxPid3} = get_index(DbShard, DDoc),
+ ?assertEqual(IdxPid1, IdxPid3).
+
+t_index_open_crashes_with_client_waiting({_Ctx, DbName}) ->
+ failing_index({blockopen, self()}),
+
+ [DbShard1] = open_shards(DbName),
+
+ % create a DDoc on Db1
+ {DDoc, DbShard} = create_ddoc(DbShard1, <<"idx_name">>),
+
+ meck:reset(couch_index_server),
+ {_, CRef1} = spawn_client(DbShard, DDoc),
+ BlockPid =
+ receive
+ {blockopen, Pid} -> Pid
+ end,
+ {_, CRef2} = spawn_client(DbShard, DDoc),
+
+ % Clients are waiting for the response
+ receive
+ {'DOWN', CRef1, _, _, _} ->
+ ?assert(false, "should not have received a response yet");
+ {'DOWN', CRef2, _, _, _} ->
+ ?assert(false, "should not have received a response yet")
+ after 500 ->
+ ?assert(true)
+ end,
+
+ exit(BlockPid, kill),
+
+ Res1 = wait_client_or_fail(CRef1),
+ Res2 = wait_client_or_fail(CRef2),
+ ?assertMatch(killed, Res1),
+ ?assertMatch(killed, Res2),
+
+ ?assertEqual(0, couch_index_server:aggregate_queue_len()),
+
+ %% assert ETS tables are empty
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual([], openers(I)),
+ ?assertEqual([], by_db(I)),
+ ?assertEqual([], by_sig(I))
+ end,
+ seq()
+ ).
+
+t_index_open_returns_error_with_client_waiting({_Ctx, DbName}) ->
+ failing_index({blockopen, self()}),
+
+ [DbShard1] = open_shards(DbName),
+
+ % create a DDoc on Db1
+ {DDoc, DbShard} = create_ddoc(DbShard1, <<"idx_name">>),
+
+ meck:reset(couch_index_server),
+ {_, CRef1} = spawn_client(DbShard, DDoc),
+ BlockPid =
+ receive
+ {blockopen, Pid} -> Pid
+ end,
+ {_, CRef2} = spawn_client(DbShard, DDoc),
+
+ % Clients are waiting for the response
+ receive
+ {'DOWN', CRef1, _, _, _} ->
+ ?assert(false, "should not have received a response yet");
+ {'DOWN', CRef2, _, _, _} ->
+ ?assert(false, "should not have received a response yet")
+ after 500 ->
+ ?assert(true)
+ end,
+
+ BlockPid ! {return, retfail},
+
+ Res1 = wait_client_or_fail(CRef1),
+ Res2 = wait_client_or_fail(CRef2),
+ ?assertMatch(retfail, Res1),
+ ?assertMatch(retfail, Res2),
+
+ ?assertEqual(0, couch_index_server:aggregate_queue_len()),
+
+ %% assert openers ETS table is empty
+ lists:foreach(
+ fun(I) ->
+ ?assertEqual([], openers(I)),
+ ?assertEqual([], by_db(I)),
+ ?assertEqual([], by_sig(I))
+ end,
+ seq()
+ ).
+
create_ddoc(Db, DDocID) ->
DDocJson = couch_doc:from_json_obj(
{[
@@ -200,6 +351,12 @@ get_index(ShardDb, DDoc) ->
openers(I) ->
ets:tab2list(couch_index_server:openers(I)).
+by_db(I) ->
+ ets:tab2list(couch_index_server:by_db(I)).
+
+by_sig(I) ->
+ ets:tab2list(couch_index_server:by_sig(I)).
+
failing_index(Error) ->
ok = meck:new([?TEST_INDEX], [non_strict]),
ok = meck:expect(?TEST_INDEX, init, fun(Db, DDoc) ->
@@ -209,6 +366,13 @@ failing_index(Error) ->
case Error of
dontfail ->
{ok, State};
+ {blockopen, ControllerPid} ->
+ case block(blockopen, ControllerPid) of
+ {return, Err} ->
+ Err;
+ continue ->
+ {ok, State}
+ end;
{return, Err} ->
{error, Err};
{raise, Err} ->
@@ -231,3 +395,27 @@ failing_index(Error) ->
seq() ->
lists:seq(1, couch_index_server:num_servers()).
+
+block(BlockMsg, ControllerPid) when is_atom(BlockMsg), is_pid(ControllerPid) ->
+ ControllerPid ! {BlockMsg, self()},
+ receive
+ continue ->
+ continue;
+ {return, Err} ->
+ {return, Err};
+ {raise, Err} ->
+ meck:raise(error, Err);
+ {exit, Err} ->
+ meck:raise(exit, Err)
+ end.
+
+spawn_client(ShardDb, DDoc) ->
+ spawn_monitor(fun() -> exit(get_index(ShardDb, DDoc)) end).
+
+wait_client_or_fail(Ref) ->
+ receive
+ {'DOWN', Ref, _, _, Res} ->
+ Res
+ after 500 ->
+ ?assert(false, "Failed to get index")
+ end.