This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new d86847d8a Make sure to reply to couch_index_server clients
d86847d8a is described below

commit d86847d8a6087d7b6c7979987856ff9241b8ae11
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Dec 14 16:03:04 2023 -0500

    Make sure to reply to couch_index_server clients
    
    Previously we didin't always reply to waiting `get_index` calls as 
sometimes we
    clobbered or deleted waiters in by_sig ets table.
    
    Ref: https://github.com/apache/couchdb/pull/4491
---
 src/couch_index/src/couch_index_server.erl         |  60 ++++---
 .../test/eunit/couch_index_crash_tests.erl         | 188 +++++++++++++++++++++
 2 files changed, 228 insertions(+), 20 deletions(-)

diff --git a/src/couch_index/src/couch_index_server.erl 
b/src/couch_index/src/couch_index_server.erl
index 549f8a226..973b06337 100644
--- a/src/couch_index/src/couch_index_server.erl
+++ b/src/couch_index/src/couch_index_server.erl
@@ -171,9 +171,8 @@ handle_call({async_open, {DbName, DDocId, Sig}, {ok, Pid}}, 
{OpenerPid, _} = Fro
     unlink(OpenerPid),
     ets:delete(State#st.openers, OpenerPid),
     gen_server:reply(FromOpener, ok),
-    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+    ok = reply_to_waiters(DbName, Sig, {ok, Pid}, State),
     add_to_ets(DbName, Sig, DDocId, Pid, State),
-    [gen_server:reply(From, {ok, Pid}) || From <- Waiters],
     % Flush opener exit messages in case it died before we unlinked it
     ok = flush_exit_messages_from(OpenerPid),
     {noreply, State};
@@ -182,9 +181,8 @@ handle_call({async_error, {DbName, _DDocId, Sig}, Error}, 
{OpenerPid, _} = FromO
     unlink(OpenerPid),
     ets:delete(State#st.openers, OpenerPid),
     gen_server:reply(FromOpener, ok),
-    [{_, Waiters}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
+    ok = reply_to_waiters(DbName, Sig, Error, State),
     ets:delete(State#st.by_sig, {DbName, Sig}),
-    [gen_server:reply(From, Error) || From <- Waiters],
     % Flush opener exit messages in case it died before we unlinked it
     ok = flush_exit_messages_from(OpenerPid),
     {noreply, State};
@@ -207,8 +205,8 @@ handle_cast({add_to_ets, [Pid, DbName, DDocId, Sig]}, 
State) ->
 handle_cast({rem_from_ets, [DbName, DDocId, Sig]}, State) ->
     ets:delete_object(State#st.by_db, {DbName, {DDocId, Sig}}),
     {noreply, State};
-handle_cast({rem_from_ets, [DbName]}, State) ->
-    rem_from_ets(DbName, State),
+handle_cast({rem_from_ets, [DbName, Reason]}, State) ->
+    rem_from_ets_with_reply(DbName, Reason, State),
     {noreply, State}.
 
 handle_info({'EXIT', Pid, Reason}, Server) ->
@@ -218,10 +216,11 @@ handle_info({'EXIT', Pid, Reason}, Server) ->
          || {_, {DDocId, _}} <-
                 ets:match_object(Server#st.by_db, {DbName, {'$1', Sig}})
         ],
-        rem_from_ets(DbName, Sig, DDocIds, Pid, Server)
+        rem_from_ets_with_reply(DbName, Sig, DDocIds, Reason, Server)
     end,
     case ets:lookup(Server#st.by_pid, Pid) of
         [{Pid, {DbName, Sig}}] ->
+            ets:delete(Server#st.by_pid, Pid),
             Cleanup(DbName, Sig);
         [] when Reason /= normal ->
             case ets:lookup(Server#st.openers, Pid) of
@@ -285,11 +284,16 @@ reset_indexes(DbName, #st{} = State) ->
         ets:lookup(State#st.by_db, DbName)
     ),
     Fun = fun({Sig, DDocIds}) ->
-        [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
-        unlink(Pid),
-        gen_server:cast(Pid, delete),
-        ok = flush_exit_messages_from(Pid),
-        rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+        case ets:lookup(State#st.by_sig, {DbName, Sig}) of
+            [{_, Pid}] when is_pid(Pid) ->
+                unlink(Pid),
+                gen_server:cast(Pid, delete),
+                ok = flush_exit_messages_from(Pid),
+                ets:delete(State#st.by_pid, Pid);
+            _ ->
+                ok
+        end,
+        rem_from_ets_with_reply(DbName, Sig, DDocIds, {error, index_reset}, 
State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)),
     % We only need one of the index servers to do this.
@@ -301,14 +305,25 @@ reset_indexes(DbName, #st{} = State) ->
             ok
     end.
 
+reply_to_waiters(DbName, Sig, Reply, St = #st{}) ->
+    case ets:lookup(St#st.by_sig, {DbName, Sig}) of
+        [{_, Waiters}] when is_list(Waiters) ->
+            [gen_server:reply(From, Reply) || From <- Waiters];
+        [{_, Pid}] when is_pid(Pid) ->
+            [];
+        [] ->
+            []
+    end,
+    ok.
+
 add_to_ets(DbName, Sig, DDocId, Pid, #st{} = St) ->
     ets:insert(St#st.by_sig, {{DbName, Sig}, Pid}),
     ets:insert(St#st.by_pid, {Pid, {DbName, Sig}}),
     ets:insert(St#st.by_db, {DbName, {DDocId, Sig}}).
 
-rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
+rem_from_ets_with_reply(DbName, Sig, DDocIds, Reply, #st{} = St) ->
+    ok = reply_to_waiters(DbName, Sig, Reply, St),
     ets:delete(St#st.by_sig, {DbName, Sig}),
-    ets:delete(St#st.by_pid, Pid),
     lists:foreach(
         fun(DDocId) ->
             ets:delete_object(St#st.by_db, {DbName, {DDocId, Sig}})
@@ -316,7 +331,7 @@ rem_from_ets(DbName, Sig, DDocIds, Pid, #st{} = St) ->
         DDocIds
     ).
 
-rem_from_ets(DbName, #st{} = State) ->
+rem_from_ets_with_reply(DbName, Reply, #st{} = State) ->
     SigDDocIds = lists:foldl(
         fun({_, {DDocId, Sig}}, DDict) ->
             dict:append(Sig, DDocId, DDict)
@@ -325,10 +340,15 @@ rem_from_ets(DbName, #st{} = State) ->
         ets:lookup(State#st.by_db, DbName)
     ),
     Fun = fun({Sig, DDocIds}) ->
-        [{_, Pid}] = ets:lookup(State#st.by_sig, {DbName, Sig}),
-        unlink(Pid),
-        ok = flush_exit_messages_from(Pid),
-        rem_from_ets(DbName, Sig, DDocIds, Pid, State)
+        case ets:lookup(State#st.by_sig, {DbName, Sig}) of
+            [{_, Pid}] when is_pid(Pid) ->
+                unlink(Pid),
+                ok = flush_exit_messages_from(Pid),
+                ets:delete(State#st.by_pid, Pid);
+            _ ->
+                ok
+        end,
+        rem_from_ets_with_reply(DbName, Sig, DDocIds, Reply, State)
     end,
     lists:foreach(Fun, dict:to_list(SigDDocIds)).
 
@@ -380,7 +400,7 @@ handle_db_event(<<"shards/", _/binary>> = DbName, 
{ddoc_updated, DDocId}, St) ->
             couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, 
stack ~p", [
                 ?MODULE, Class, DbName, Reason, Stack
             ]),
-            gen_server:cast(St#st.server_name, {rem_from_ets, [DbName]}),
+            gen_server:cast(St#st.server_name, {rem_from_ets, [DbName, 
Reason]}),
             {ok, St}
     end;
 handle_db_event(DbName, {ddoc_updated, DDocId}, St) ->
diff --git a/src/couch_index/test/eunit/couch_index_crash_tests.erl 
b/src/couch_index/test/eunit/couch_index_crash_tests.erl
index 88f8dc845..c684ecdaa 100644
--- a/src/couch_index/test/eunit/couch_index_crash_tests.erl
+++ b/src/couch_index/test/eunit/couch_index_crash_tests.erl
@@ -78,6 +78,21 @@ index_crash_test_() ->
         }
     }.
 
+index_crash_test_with_client_waiting_test_() ->
+    {
+        "Simulate index crashing while clients are waiting",
+        {
+            foreach,
+            fun start/0,
+            fun stop/1,
+            [
+                ?TDEF_FE(t_index_open_with_clients_waiting),
+                ?TDEF_FE(t_index_open_crashes_with_client_waiting),
+                ?TDEF_FE(t_index_open_returns_error_with_client_waiting)
+            ]
+        }
+    }.
+
 t_can_open_mock_index({_Ctx, DbName}) ->
     failing_index(dontfail),
 
@@ -173,6 +188,142 @@ t_index_process_dies({_Ctx, DbName}) ->
     ServerPids2 = lists:sort([whereis(N) || N <- couch_index_server:names()]),
     ?assertEqual(ServerPids, ServerPids2).
 
+t_index_open_with_clients_waiting({_Ctx, DbName}) ->
+    failing_index({blockopen, self()}),
+
+    [DbShard1] = open_shards(DbName),
+
+    % create a DDoc on Db1
+    {DDoc, DbShard} = create_ddoc(DbShard1, <<"idx_name">>),
+
+    meck:reset(couch_index_server),
+    {_, CRef1} = spawn_client(DbShard, DDoc),
+    BlockPid =
+        receive
+            {blockopen, Pid} -> Pid
+        end,
+    {_, CRef2} = spawn_client(DbShard, DDoc),
+
+    % Clients are waiting for the response
+    receive
+        {'DOWN', CRef1, _, _, _} ->
+            ?assert(false, "should not have received a response yet");
+        {'DOWN', CRef2, _, _, _} ->
+            ?assert(false, "should not have received a response yet")
+    after 500 ->
+        ?assert(true)
+    end,
+
+    BlockPid ! continue,
+
+    {ok, IdxPid1} = wait_client_or_fail(CRef1),
+    {ok, IdxPid2} = wait_client_or_fail(CRef2),
+    ?assert(is_pid(IdxPid1)),
+    ?assert(is_pid(IdxPid2)),
+    ?assertEqual(IdxPid1, IdxPid2),
+
+    ?assertEqual(0, couch_index_server:aggregate_queue_len()),
+
+    %% assert opener ets table is empty
+    lists:foreach(fun(I) -> ?assertEqual([], openers(I)) end, seq()),
+
+    ?assert(meck:called(couch_index_server, handle_call, [{async_open, '_', 
'_'}, '_', '_'])),
+
+    % here we should get the pid from the ets table
+    ?assertMatch({ok, _}, get_index(DbShard, DDoc)),
+    {ok, IdxPid3} = get_index(DbShard, DDoc),
+    ?assertEqual(IdxPid1, IdxPid3).
+
+t_index_open_crashes_with_client_waiting({_Ctx, DbName}) ->
+    failing_index({blockopen, self()}),
+
+    [DbShard1] = open_shards(DbName),
+
+    % create a DDoc on Db1
+    {DDoc, DbShard} = create_ddoc(DbShard1, <<"idx_name">>),
+
+    meck:reset(couch_index_server),
+    {_, CRef1} = spawn_client(DbShard, DDoc),
+    BlockPid =
+        receive
+            {blockopen, Pid} -> Pid
+        end,
+    {_, CRef2} = spawn_client(DbShard, DDoc),
+
+    % Clients are waiting for the response
+    receive
+        {'DOWN', CRef1, _, _, _} ->
+            ?assert(false, "should not have received a response yet");
+        {'DOWN', CRef2, _, _, _} ->
+            ?assert(false, "should not have received a response yet")
+    after 500 ->
+        ?assert(true)
+    end,
+
+    exit(BlockPid, kill),
+
+    Res1 = wait_client_or_fail(CRef1),
+    Res2 = wait_client_or_fail(CRef2),
+    ?assertMatch(killed, Res1),
+    ?assertMatch(killed, Res2),
+
+    ?assertEqual(0, couch_index_server:aggregate_queue_len()),
+
+    %% assert ETS tables are empty
+    lists:foreach(
+        fun(I) ->
+            ?assertEqual([], openers(I)),
+            ?assertEqual([], by_db(I)),
+            ?assertEqual([], by_sig(I))
+        end,
+        seq()
+    ).
+
+t_index_open_returns_error_with_client_waiting({_Ctx, DbName}) ->
+    failing_index({blockopen, self()}),
+
+    [DbShard1] = open_shards(DbName),
+
+    % create a DDoc on Db1
+    {DDoc, DbShard} = create_ddoc(DbShard1, <<"idx_name">>),
+
+    meck:reset(couch_index_server),
+    {_, CRef1} = spawn_client(DbShard, DDoc),
+    BlockPid =
+        receive
+            {blockopen, Pid} -> Pid
+        end,
+    {_, CRef2} = spawn_client(DbShard, DDoc),
+
+    % Clients are waiting for the response
+    receive
+        {'DOWN', CRef1, _, _, _} ->
+            ?assert(false, "should not have received a response yet");
+        {'DOWN', CRef2, _, _, _} ->
+            ?assert(false, "should not have received a response yet")
+    after 500 ->
+        ?assert(true)
+    end,
+
+    BlockPid ! {return, retfail},
+
+    Res1 = wait_client_or_fail(CRef1),
+    Res2 = wait_client_or_fail(CRef2),
+    ?assertMatch(retfail, Res1),
+    ?assertMatch(retfail, Res2),
+
+    ?assertEqual(0, couch_index_server:aggregate_queue_len()),
+
+    %% assert openers ETS table is empty
+    lists:foreach(
+        fun(I) ->
+            ?assertEqual([], openers(I)),
+            ?assertEqual([], by_db(I)),
+            ?assertEqual([], by_sig(I))
+        end,
+        seq()
+    ).
+
 create_ddoc(Db, DDocID) ->
     DDocJson = couch_doc:from_json_obj(
         {[
@@ -200,6 +351,12 @@ get_index(ShardDb, DDoc) ->
 openers(I) ->
     ets:tab2list(couch_index_server:openers(I)).
 
+by_db(I) ->
+    ets:tab2list(couch_index_server:by_db(I)).
+
+by_sig(I) ->
+    ets:tab2list(couch_index_server:by_sig(I)).
+
 failing_index(Error) ->
     ok = meck:new([?TEST_INDEX], [non_strict]),
     ok = meck:expect(?TEST_INDEX, init, fun(Db, DDoc) ->
@@ -209,6 +366,13 @@ failing_index(Error) ->
         case Error of
             dontfail ->
                 {ok, State};
+            {blockopen, ControllerPid} ->
+                case block(blockopen, ControllerPid) of
+                    {return, Err} ->
+                        Err;
+                    continue ->
+                        {ok, State}
+                end;
             {return, Err} ->
                 {error, Err};
             {raise, Err} ->
@@ -231,3 +395,27 @@ failing_index(Error) ->
 
 seq() ->
     lists:seq(1, couch_index_server:num_servers()).
+
+block(BlockMsg, ControllerPid) when is_atom(BlockMsg), is_pid(ControllerPid) ->
+    ControllerPid ! {BlockMsg, self()},
+    receive
+        continue ->
+            continue;
+        {return, Err} ->
+            {return, Err};
+        {raise, Err} ->
+            meck:raise(error, Err);
+        {exit, Err} ->
+            meck:raise(exit, Err)
+    end.
+
+spawn_client(ShardDb, DDoc) ->
+    spawn_monitor(fun() -> exit(get_index(ShardDb, DDoc)) end).
+
+wait_client_or_fail(Ref) ->
+    receive
+        {'DOWN', Ref, _, _, Res} ->
+            Res
+    after 500 ->
+        ?assert(false, "Failed to get index")
+    end.

Reply via email to