This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch fix-mem3-stuck-job-after-nodes-change
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 21efe5392a36bf73a70b55da738ce56f56b74a29
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Fri Aug 1 17:24:13 2025 -0400

    Stop replication jobs to nodes which are not part of the cluster
    
    Previously, if users updated their shard map and removed a node, it
    was possible for an in-progress internal replication job to be stuck forever
    trying to replicate to the now non-existent node.
    
    To fix it, do what we did for dbs a while back -- if the db or nodes are not
    current, then stop trying to replicate to them.
    
    Fix #5612
---
 src/mem3/src/mem3_sync.erl | 43 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 42 insertions(+), 1 deletion(-)

diff --git a/src/mem3/src/mem3_sync.erl b/src/mem3/src/mem3_sync.erl
index 04e4d1889..67eb77181 100644
--- a/src/mem3/src/mem3_sync.erl
+++ b/src/mem3/src/mem3_sync.erl
@@ -162,7 +162,7 @@ handle_info({'EXIT', Active, Reason}, State) ->
                     {pending_changes, Count} ->
                         maybe_resubmit(State, Job#job{pid = nil, count = 
Count});
                     _ ->
-                        case mem3:db_is_current(Job#job.name) of
+                        case is_job_current(Job, nodes(), mem3:nodes()) of
                             true ->
                                 timer:apply_after(5000, ?MODULE, push, 
[Job#job{pid = nil}]);
                             false ->
@@ -390,6 +390,14 @@ maybe_redirect(Node) ->
             list_to_existing_atom(Redirect)
     end.
 
+% Check that the db exists and node is either connected or part of the cluster.
+%
+is_job_current(#job{name = Name, node = Node}, ConnectedNodes, Mem3Nodes) ->
+    DbCurrent = mem3:db_is_current(Name),
+    Connected = lists:member(Node, ConnectedNodes),
+    InMem3Nodes = lists:member(Node, Mem3Nodes),
+    DbCurrent andalso (Connected orelse InMem3Nodes).
+
 -ifdef(TEST).
 
 -include_lib("couch/include/couch_eunit.hrl").
@@ -404,4 +412,37 @@ find_next_node_test() ->
     ?assertEqual(x, find_next_node(n, [n, x], [n, x])),
     ?assertEqual(a, find_next_node(n, [a, n, x], [a, n, y])).
 
+is_job_current_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_is_job_current)
+        ]
+    }.
+
+setup() ->
+    Ctx = test_util:start_couch(),
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    couch_db:close(Db),
+    {Ctx, DbName}.
+
+teardown({Ctx, DbName}) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+    test_util:stop_couch(Ctx).
+
+t_is_job_current({_, DbName}) ->
+    Job = #job{name = DbName, node = n1},
+    ?assert(is_job_current(Job, [], [n1])),
+    ?assert(is_job_current(Job, [n1], [])),
+    ?assert(is_job_current(Job, [n1], [n1])),
+    ?assertNot(is_job_current(Job, [n2], [])),
+    ?assertNot(is_job_current(Job, [], [n2])),
+    ?assertNot(is_job_current(Job, [], [])),
+    ?assertNot(is_job_current(Job, [n2], [n2])),
+    ?assertNot(is_job_current(Job#job{name = <<"x">>}, [n1], [n1])),
+    ?assertNot(is_job_current(Job#job{name = <<"x">>}, [], [])).
+
 -endif.

Reply via email to