This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch fix-mem3-stuck-job-after-nodes-change in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 21efe5392a36bf73a70b55da738ce56f56b74a29 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Fri Aug 1 17:24:13 2025 -0400 Stop replication jobs to nodes which are not part of the cluster Previously, if users updated their shard map and removed a node, it was possible for an in-progress internal replication job to be stuck forever trying to replicate to the now non-existent node. To fix it, do what we did for dbs a while back -- if the db or nodes are not current, then stop trying to replicate to them. Fix #5612 --- src/mem3/src/mem3_sync.erl | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl index 04e4d1889..67eb77181 100644 --- a/src/mem3/src/mem3_sync.erl +++ b/src/mem3/src/mem3_sync.erl @@ -162,7 +162,7 @@ handle_info({'EXIT', Active, Reason}, State) -> {pending_changes, Count} -> maybe_resubmit(State, Job#job{pid = nil, count = Count}); _ -> - case mem3:db_is_current(Job#job.name) of + case is_job_current(Job, nodes(), mem3:nodes()) of true -> timer:apply_after(5000, ?MODULE, push, [Job#job{pid = nil}]); false -> @@ -390,6 +390,14 @@ maybe_redirect(Node) -> list_to_existing_atom(Redirect) end. +% Check that the db exists and node is either connected or part of the cluster. +% +is_job_current(#job{name = Name, node = Node}, ConnectedNodes, Mem3Nodes) -> + DbCurrent = mem3:db_is_current(Name), + Connected = lists:member(Node, ConnectedNodes), + InMem3Nodes = lists:member(Node, Mem3Nodes), + DbCurrent andalso (Connected orelse InMem3Nodes). + -ifdef(TEST). -include_lib("couch/include/couch_eunit.hrl"). @@ -404,4 +412,37 @@ find_next_node_test() -> ?assertEqual(x, find_next_node(n, [n, x], [n, x])), ?assertEqual(a, find_next_node(n, [a, n, x], [a, n, y])). +is_job_current_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_is_job_current) + ] + }. + +setup() -> + Ctx = test_util:start_couch(), + DbName = ?tempdb(), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + couch_db:close(Db), + {Ctx, DbName}. + +teardown({Ctx, DbName}) -> + ok = couch_server:delete(DbName, [?ADMIN_CTX]), + test_util:stop_couch(Ctx). + +t_is_job_current({_, DbName}) -> + Job = #job{name = DbName, node = n1}, + ?assert(is_job_current(Job, [], [n1])), + ?assert(is_job_current(Job, [n1], [])), + ?assert(is_job_current(Job, [n1], [n1])), + ?assertNot(is_job_current(Job, [n2], [])), + ?assertNot(is_job_current(Job, [], [n2])), + ?assertNot(is_job_current(Job, [], [])), + ?assertNot(is_job_current(Job, [n2], [n2])), + ?assertNot(is_job_current(Job#job{name = <<"x">>}, [n1], [n1])), + ?assertNot(is_job_current(Job#job{name = <<"x">>}, [], [])). + -endif.
