This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch handle-database-recreation-case-in-mem3 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 2ac72cdffbaa096b0de7f31c0a5e229e95d5def3 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Apr 30 19:28:12 2019 -0400 Handle database re-creation edge case in internal replicator Previously, if a database was deleted and re-created while the internal replication request was pending, the job would have been retried continously. mem3:targets_map/2 function would return an empty targets map and mem3_rep:go would raise a function clause exception if the database as present but it was an older "incarnation" of it (with shards living on different target nodes). Because it was an exception and not an {error, ...} result, the process would exit with an error. Subsequently, mem3_sync would try to handle process exit and check of the database was deleted, but it also didn't account for the case when the database was created, so it would resubmit the into queue again. To fix it, we introduce a function to check if the database shard is part of the current database shard map. Then peform the check both before building the targets map and also on job retries. --- src/mem3/src/mem3.erl | 15 +++++++++++++++ src/mem3/src/mem3_rep.erl | 8 +++++++- src/mem3/src/mem3_sync.erl | 11 ++++++----- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index dc666fd..333739d 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -22,6 +22,7 @@ -export([belongs/2, owner/3]). -export([get_placement/1]). -export([ping/1, ping/2]). +-export([db_is_current/1]). %% For mem3 use only. -export([name/1, node/1, range/1, engine/1]). @@ -367,6 +368,20 @@ ping(Node, Timeout) when is_atom(Node) -> pang end. + +db_is_current(#shard{name = Name}) -> + db_is_current(Name); + +db_is_current(<<"shards/", _/binary>> = Name) -> + try + Shards = mem3:shards(mem3:dbname(Name)), + lists:keyfind(Name, #shard.name, Shards) =/= false + catch + error:database_does_not_exist -> + false + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index d5b42d3..d2edd6c 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -64,7 +64,13 @@ go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) -> go(#shard{name=DbName, node=node()}, #shard{name=DbName, node=Node}, Opts); go(#shard{} = Source, #shard{} = Target, Opts) -> - go(Source, targets_map(Source, Target), Opts); + case mem3:db_is_current(Source) of + true -> + go(Source, targets_map(Source, Target), Opts); + false -> + % Database could have been recreated + {error, missing_source} + end; go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 -> Targets = maps:map(fun(_, T) -> #tgt{shard = T} end, Targets0), diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 693fc4f..8170f3c 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -140,11 +140,12 @@ handle_info({'EXIT', Active, Reason}, State) -> case Reason of {pending_changes, Count} -> maybe_resubmit(State, Job#job{pid = nil, count = Count}); _ -> - try mem3:shards(mem3:dbname(Job#job.name)) of _ -> - timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]) - catch error:database_does_not_exist -> - % no need to retry - ok + case mem3:db_is_current(Job#job.name) of + true -> + timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]); + false -> + % no need to retry (db deleted or recreated) + ok end, State end;
