This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch 3.x
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/3.x by this push:
     new 0e38f3907 Improve index building during shard splitting
0e38f3907 is described below

commit 0e38f390796d6e34f698853f5886a36607e2aeec
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Fri Apr 22 19:53:13 2022 -0400

    Improve index building during shard splitting
    
    Previously we didn't check responses from get_state/2 or await/2 functions 
when
    building indices. If an index updater crashed, and the index never finished
    building, the get_state/2 call would simply return an error and the process
    would exit normally. Then, the shard splitting job would count that as a
    success and continue to make progress.
    
    To fix that, make sure to check the response to all the supported indexing
    types and wait until they return an `ok` result.
    
    Additionally, increase the index building resilience to allow for more 
retries
    on failure, and for configurable retries for individual index builders.
---
 rel/overlay/etc/default.ini               |   8 +-
 src/mem3/src/mem3_reshard_index.erl       | 131 ++++++++++++++++++------------
 src/mem3/src/mem3_reshard_job.erl         |  22 ++---
 src/mem3/test/eunit/mem3_reshard_test.erl |  54 ++++++++++--
 4 files changed, 142 insertions(+), 73 deletions(-)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 7535501c0..5fb45b5b5 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -719,7 +719,7 @@ state_dir = {{state_dir}}
 [reshard]
 ;max_jobs = 48
 ;max_history = 20
-;max_retries = 1
+;max_retries = 5
 ;retry_interval_sec = 10
 ;delete_source = true
 ;update_shard_map_timeout_sec = 60
@@ -727,6 +727,12 @@ state_dir = {{state_dir}}
 ;require_node_param = false
 ;require_range_param = false
 
+; How many times to retry building an individual index
+;index_max_retries = 5
+
+; How many seconds to wait between retries for an individual index
+;index_retry_interval_sec = 10
+
 [prometheus]
 additional_port = false
 bind_address = 127.0.0.1
diff --git a/src/mem3/src/mem3_reshard_index.erl 
b/src/mem3/src/mem3_reshard_index.erl
index fef25d52c..fa0a101b5 100644
--- a/src/mem3/src/mem3_reshard_index.erl
+++ b/src/mem3/src/mem3_reshard_index.erl
@@ -15,11 +15,15 @@
 -export([
     design_docs/1,
     target_indices/2,
-    spawn_builders/1
+    spawn_builders/1,
+    build_index/2
 ]).
 
+-define(MRVIEW, mrview).
+-define(DREYFUS, dreyfus).
+-define(HASTINGS, hastings).
+
 -include_lib("mem3/include/mem3.hrl").
--include_lib("couch/include/couch_db.hrl").
 
 %% Public API
 
@@ -44,25 +48,8 @@ target_indices(Docs, Targets) ->
     lists:flatten(Indices).
 
 spawn_builders(Indices) ->
-    Results = [build_index(Index) || Index <- Indices],
-    Oks = [{ok, Pid} || {ok, Pid} <- Results, is_pid(Pid)],
-    case Results -- Oks of
-        [] ->
-            {ok, [Pid || {ok, Pid} <- Results]};
-        Error ->
-            % Do a all or nothing pattern, if some indices could not be
-            % spawned, kill the spawned ones and and return the error.
-            ErrMsg = "~p failed to spawn index builders: ~p ~p",
-            couch_log:error(ErrMsg, [?MODULE, Error, Indices]),
-            lists:foreach(
-                fun({ok, Pid}) ->
-                    catch unlink(Pid),
-                    catch exit(Pid, kill)
-                end,
-                Oks
-            ),
-            {error, Error}
-    end.
+    Retries = max_retries(),
+    [spawn_link(?MODULE, build_index, [Idx, Retries]) || Idx <- Indices].
 
 %% Private API
 
@@ -83,7 +70,7 @@ mrview_indices(DbName, Doc) ->
         Views = couch_mrview_index:get(views, MRSt),
         case Views =/= [] of
             true ->
-                [{mrview, DbName, MRSt}];
+                [{?MRVIEW, DbName, MRSt}];
             false ->
                 []
         end
@@ -97,7 +84,7 @@ mrview_indices(DbName, Doc) ->
 dreyfus_indices(DbName, Doc) ->
     try
         Indices = dreyfus_index:design_doc_to_indexes(Doc),
-        [{dreyfus, DbName, Index} || Index <- Indices]
+        [{?DREYFUS, DbName, Index} || Index <- Indices]
     catch
         Tag:Err ->
             Msg = "~p couldn't get dreyfus indices ~p ~p ~p:~p",
@@ -108,7 +95,7 @@ dreyfus_indices(DbName, Doc) ->
 hastings_indices(DbName, Doc) ->
     try
         Indices = hastings_index:design_doc_to_indexes(Doc),
-        [{hastings, DbName, Index} || Index <- Indices]
+        [{?HASTINGS, DbName, Index} || Index <- Indices]
     catch
         Tag:Err ->
             Msg = "~p couldn't get hasting indices ~p ~p ~p:~p",
@@ -116,33 +103,71 @@ hastings_indices(DbName, Doc) ->
             []
     end.
 
-build_index({mrview, DbName, MRSt}) ->
-    case couch_index_server:get_index(couch_mrview_index, MRSt) of
-        {ok, Pid} ->
-            Args = [Pid, get_update_seq(DbName)],
-            WPid = spawn_link(couch_index, get_state, Args),
-            {ok, WPid};
-        Error ->
-            Error
-    end;
-build_index({dreyfus, DbName, Index}) ->
-    case dreyfus_index_manager:get_index(DbName, Index) of
-        {ok, Pid} ->
-            Args = [Pid, get_update_seq(DbName)],
-            WPid = spawn_link(dreyfus_index, await, Args),
-            {ok, WPid};
-        Error ->
-            Error
+build_index({?MRVIEW, _DbName, MRSt} = Ctx, Try) ->
+    await_retry(
+        couch_index_server:get_index(couch_mrview_index, MRSt),
+        fun couch_index:get_state/2,
+        Ctx,
+        Try
+    );
+build_index({?DREYFUS, DbName, DIndex} = Ctx, Try) ->
+    await_retry(
+        dreyfus_index_manager:get_index(DbName, DIndex),
+        fun dreyfus_index:await/2,
+        Ctx,
+        Try
+    );
+build_index({?HASTINGS, DbName, HIndex} = Ctx, Try) ->
+    await_retry(
+        hastings_index_manager:get_index(DbName, HIndex),
+        fun hastings_index:await/2,
+        Ctx,
+        Try
+    ).
+
+await_retry({ok, Pid}, AwaitIndex, {_, DbName, _} = Ctx, Try) ->
+    try AwaitIndex(Pid, get_update_seq(DbName)) of
+        {ok, _} -> ok;
+        {ok, _, _} -> ok;
+        AwaitError -> maybe_retry(Ctx, AwaitError, Try)
+    catch
+        _:CatchError ->
+            maybe_retry(Ctx, CatchError, Try)
     end;
-build_index({hastings, DbName, Index}) ->
-    case hastings_index_manager:get_index(DbName, Index) of
-        {ok, Pid} ->
-            Args = [Pid, get_update_seq(DbName)],
-            WPid = spawn_link(hastings_index, await, Args),
-            {ok, WPid};
-        Error ->
-            Error
-    end.
+await_retry(OpenError, _AwaitIndex, Ctx, Try) ->
+    maybe_retry(Ctx, OpenError, Try).
+
+maybe_retry(Ctx, killed = Error, Try) ->
+    retry(Ctx, Error, Try);
+maybe_retry(Ctx, {killed, _} = Error, Try) ->
+    retry(Ctx, Error, Try);
+maybe_retry(Ctx, shutdown = Error, Try) ->
+    retry(Ctx, Error, Try);
+maybe_retry(Ctx, Error, 0) ->
+    fail(Ctx, Error);
+maybe_retry(Ctx, Error, Try) when is_integer(Try), Try > 0 ->
+    retry(Ctx, Error, Try - 1).
+
+retry(Ctx, Error, Try) ->
+    IndexInfo = index_info(Ctx),
+    LogMsg = "~p : error ~p when building ~p, retrying (~p)",
+    couch_log:warning(LogMsg, [?MODULE, Error, IndexInfo, Try]),
+    timer:sleep(retry_interval_sec() * 1000),
+    build_index(Ctx, Try).
+
+fail(Ctx, Error) ->
+    IndexInfo = index_info(Ctx),
+    LogMsg = "~p : error ~p when building ~p, max tries exceeded, failing",
+    couch_log:error(LogMsg, [?MODULE, Error, IndexInfo]),
+    exit({error_building_index, IndexInfo}).
+
+index_info({?MRVIEW, DbName, MRSt}) ->
+    GroupName = couch_mrview_index:get(idx_name, MRSt),
+    {DbName, GroupName};
+index_info({?DREYFUS, DbName, Index}) ->
+    {DbName, Index};
+index_info({?HASTINGS, DbName, Index}) ->
+    {DbName, Index}.
 
 has_app(App) ->
     code:lib_dir(App) /= {error, bad_name}.
@@ -151,3 +176,9 @@ get_update_seq(DbName) ->
     couch_util:with_db(DbName, fun(Db) ->
         couch_db:get_update_seq(Db)
     end).
+
+max_retries() ->
+    config:get_integer("reshard", "index_max_retries", 5).
+
+retry_interval_sec() ->
+    config:get_integer("reshard", "index_retry_interval_sec", 10).
diff --git a/src/mem3/src/mem3_reshard_job.erl 
b/src/mem3/src/mem3_reshard_job.erl
index aff5c2648..a9fb48134 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -408,29 +408,17 @@ topoff_impl(#job{source = #shard{} = Source, target = 
Targets}) ->
 build_indices(#job{} = Job) ->
     #job{
         source = #shard{name = SourceName} = Source,
-        target = Targets,
-        retries = Retries,
-        state_info = Info
+        target = Targets
     } = Job,
     check_source_exists(Source, build_indices),
     {ok, DDocs} = mem3_reshard_index:design_docs(SourceName),
     Indices = mem3_reshard_index:target_indices(DDocs, Targets),
     case mem3_reshard_index:spawn_builders(Indices) of
-        {ok, []} ->
+        [] ->
             % Skip the log spam if this is a no-op
             Job#job{workers = []};
-        {ok, Pids} ->
-            report(Job#job{workers = Pids});
-        {error, Error} ->
-            case Job#job.retries =< max_retries() of
-                true ->
-                    build_indices(Job#job{
-                        retries = Retries + 1,
-                        state_info = info_update(error, Error, Info)
-                    });
-                false ->
-                    exit(Error)
-            end
+        [_ | _] = Pids ->
+            report(Job#job{workers = Pids})
     end.
 
 copy_local_docs(#job{split_state = copy_local_docs} = Job) ->
@@ -612,7 +600,7 @@ check_targets_exist(Targets, StateName) ->
 
 -spec max_retries() -> integer().
 max_retries() ->
-    config:get_integer("reshard", "max_retries", 1).
+    config:get_integer("reshard", "max_retries", 5).
 
 -spec retry_interval_sec() -> integer().
 retry_interval_sec() ->
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl 
b/src/mem3/test/eunit/mem3_reshard_test.erl
index 1929242bb..be539b47a 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -37,13 +37,15 @@ setup() ->
     create_db(Db1, [{q, 1}, {n, 1}]),
     PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}],
     create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]),
-    config:set("reshard", "retry_interval_sec", "0", _Persist = false),
+    config:set("reshard", "retry_interval_sec", "0", _Persist1 = false),
+    config:set("reshard", "index_retry_interval_sec", "0", _Persist2 = false),
     #{db1 => Db1, db2 => Db2}.
 
 teardown(#{} = Dbs) ->
     mem3_reshard:reset_state(),
     maps:map(fun(_, Db) -> delete_db(Db) end, Dbs),
-    config:delete("reshard", "retry_interval_sec", _Persist = false),
+    config:delete("reshard", "index_retry_interval_sec", _Persist1 = false),
+    config:delete("reshard", "retry_interval_sec", _Persist2 = false),
     meck:unload().
 
 start_couch() ->
@@ -68,6 +70,7 @@ mem3_reshard_db_test_() ->
                     fun split_shard_with_lots_of_purges/1,
                     fun update_docs_before_topoff1/1,
                     fun indices_are_built/1,
+                    fun indices_can_be_built_with_errors/1,
                     fun split_partitioned_db/1,
                     fun split_twice/1,
                     fun couch_events_are_emitted/1,
@@ -273,6 +276,47 @@ indices_are_built(#{db1 := Db}) ->
             end
         end)}.
 
+% This test that indices are built despite intermittent errors.
+indices_can_be_built_with_errors(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT,
+        ?_test(begin
+            add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 
2}),
+            [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+            meck:expect(
+                couch_index_server,
+                get_index,
+                2,
+                meck:seq([
+                    meck:raise(error, foo_reason),
+                    meck:raise(exit, killed),
+                    meck:passthrough()
+                ])
+            ),
+            meck:expect(
+                couch_index,
+                get_state,
+                2,
+                meck:seq([
+                    meck:raise(error, bar_reason),
+                    meck:raise(exit, killed),
+                    meck:val({not_ok, other}),
+                    meck:passthrough()
+                ])
+            ),
+            {ok, JobId} = mem3_reshard:start_split_job(Shard),
+            wait_state(JobId, completed),
+            % Normally would expect 4 (2 shards x 2 mrviews), but there were 2
+            % failures in get_index/2 and 3 in get_state/3 for a total of 4 + 
5 = 9
+            ?assertEqual(9, meck:num_calls(couch_index_server, get_index, 2)),
+            % Normally would be 4 calls (2 shards x 2 mrviews), but there were
+            % 3 extra failures in get_state/2 for a total of 4 + 3 = 7
+            ?assertEqual(7, meck:num_calls(couch_index, get_state, 2)),
+            Shards1 = lists:sort(mem3:local_shards(Db)),
+            ?assertEqual(2, length(Shards1)),
+            MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>),
+            ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo)
+        end)}.
+
 mock_dreyfus_indices() ->
     meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) ->
         #doc{body = {BodyProps}} = Doc,
@@ -284,7 +328,7 @@ mock_dreyfus_indices() ->
         end
     end),
     meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end),
-    meck:expect(dreyfus_index, await, fun(_, _) -> ok end).
+    meck:expect(dreyfus_index, await, fun(_, _) -> {ok, indexpid, someseq} 
end).
 
 mock_hastings_indices() ->
     meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) ->
@@ -297,7 +341,7 @@ mock_hastings_indices() ->
         end
     end),
     meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end),
-    meck:expect(hastings_index, await, fun(_, _) -> ok end).
+    meck:expect(hastings_index, await, fun(_, _) -> {ok, someseq} end).
 
 % Split partitioned database
 split_partitioned_db(#{db2 := Db}) ->
@@ -504,7 +548,7 @@ retries_work(#{db1 := Db}) ->
             {ok, JobId} = mem3_reshard:start_split_job(Shard),
 
             wait_state(JobId, failed),
-            ?assertEqual(3, meck:num_calls(couch_db_split, split, 3))
+            ?assertEqual(7, meck:num_calls(couch_db_split, split, 3))
         end)}.
 
 target_reset_in_initial_copy(#{db1 := Db}) ->

Reply via email to