This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch fix-index-server-premature-return in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 616e174265710fb1f94b2cda69cd6726baaa7e5e Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Oct 17 01:25:57 2023 -0400 Prevent delayed opener error from crashing index servers Previously, an index and opener process dying could have resulted in the index gen_server crashing. This was observed in the CI test as described in: https://github.com/apache/couchdb/issues/4809 The process in more detail was as follows: * When an async opener result is handled in the index server, there is a period of time when the index server is linked to both the index and the opener process. * After we reply early to the waiting clients, a client may do something to cause the indexer to crash, which would crash the opener along with it. That would generate two `{'EXIT', Pid, _}` messages queued in the indexer process' mailbox. * The index gen_server, is still processing the async opener result callback, where it would remove the openener from the `openers` table, then it returns `ok` to the async opener. * Index gen_server continues processing queued `EXIT` messages in `handle_info`: - The one for the indexer Pid is handled appropriately - The one for the opener leads to an eexit(...)` clause since we ended with an unknown process exiting. To avoid the race condition, and the extra opener `EXIT` message, unlink and reply early to the opener, as soon we linked to the indexer or had received the error. To avoid the small chance of still getting an `EXIT` message from the opener, in case it crashed right before we unlinked, flush any exit messages from it. We do a similar flushing in two other places so create small utility function to avoid duplicating the code too much. Fix: https://github.com/apache/couchdb/issues/4809 --- src/couch_index/src/couch_index_server.erl | 48 +++++++++++++++++------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index 35df43d2a..807f87a88 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -167,19 +167,29 @@ handle_call({get_index, {_Mod, _IdxState, DbName, Sig} = Args}, From, State) -> [{_, Pid}] when is_pid(Pid) -> {reply, {ok, Pid}, State} end; -handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _}, State) -> - [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), - [gen_server:reply(From, {ok, Pid}) || From <- Waiters], +handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, {OpenerPid, _} = FromOpener, State) -> link(Pid), + % Once linked with the indexer, dismiss and ignore the opener. + unlink(OpenerPid), ets:delete(State#st.openers, OpenerPid), - add_to_ets(DbName, Sig, DDocId, Pid, State), - {reply, ok, State}; -handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _}, State) -> + gen_server:reply(FromOpener, ok), [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), - [gen_server:reply(From, Error) || From <- Waiters], + add_to_ets(DbName, Sig, DDocId, Pid, State), + [gen_server:reply(From, {ok, Pid}) || From <- Waiters], + % Flush opener exit messages in case it died before we unlinked it + ok = flush_exit_messages_from(OpenerPid), + {noreply, State}; +handle_call({async_error, {DbName, _DDocId, Sig}, Error}, {OpenerPid, _} = FromOpener, State) -> + % Once opener reported the error, we can dismiss the opener + unlink(OpenerPid), ets:delete(State#st.openers, OpenerPid), + gen_server:reply(FromOpener, ok), + [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}), ets:delete(State#st.by_sig, {DbName, Sig}), - {reply, ok, State}; + [gen_server:reply(From, Error) || From <- Waiters], + % Flush opener exit messages in case it died before we unlinked it + ok = flush_exit_messages_from(OpenerPid), + {noreply, State}; handle_call({reset_indexes, DbName}, _From, State) -> reset_indexes(DbName, State), {reply, ok, State}. @@ -283,12 +293,7 @@ reset_indexes(DbName, #st{} = State) -> [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}), unlink(Pid), gen_server:cast(Pid, delete), - receive - {'EXIT', Pid, _} -> - ok - after 0 -> - ok - end, + ok = flush_exit_messages_from(Pid), rem_from_ets(DbName, Sig, DDocIds, Pid, State) end, lists:foreach(Fun, dict:to_list(SigDDocIds)), @@ -327,12 +332,7 @@ rem_from_ets(DbName, #st{} = State) -> Fun = fun({Sig, DDocIds}) -> [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}), unlink(Pid), - receive - {'EXIT', Pid, _} -> - ok - after 0 -> - ok - end, + ok = flush_exit_messages_from(Pid), rem_from_ets(DbName, Sig, DDocIds, Pid, State) end, lists:foreach(Fun, dict:to_list(SigDDocIds)). @@ -442,3 +442,11 @@ aggregate_queue_len() -> || Name <- Names ], lists:sum([X || {_, X} <- MQs]). + +flush_exit_messages_from(Pid) -> + receive + {'EXIT', Pid, _} -> + ok + after 0 -> + ok + end.
