This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch prevent-local-duplicate-jobs-error
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 770faed94091325cfccb9ffbe7a4bf75388a3b46
Author: Nick Vatamaniuc <vatam...@gmail.com>
AuthorDate: Wed Apr 24 00:35:56 2024 -0400

    Remove replication job supervisor
    
    Use the scheduler as the job supervisor, since the scheduler is already a 
fancy
    supervisor, with its own backoff logic, process monitoring, etc.
    
    This simplifies the job starting/stopping logic and fixes a bug where the
    simple_one_for_one supervisor could restart a job, but the scheduler would
    consider it not running, and try to start another job with the same 
replication
    ID on the same node. Since jobs register themselves in pg, the second job 
would
    keep crashing with duplicate_job error the first time it tried to 
checkpoint.
---
 src/couch_replicator/README.md                     |  5 --
 src/couch_replicator/src/couch_replicator.app.src  |  1 -
 src/couch_replicator/src/couch_replicator.hrl      |  1 -
 .../src/couch_replicator_job_sup.erl               | 34 ---------
 .../src/couch_replicator_scheduler.erl             | 81 ++++++++--------------
 .../src/couch_replicator_scheduler_job.erl         | 16 ++++-
 .../src/couch_replicator_scheduler_sup.erl         | 59 ----------------
 src/couch_replicator/src/couch_replicator_sup.erl  |  4 --
 8 files changed, 45 insertions(+), 156 deletions(-)

diff --git a/src/couch_replicator/README.md b/src/couch_replicator/README.md
index 6a2a5cfdd..b3687ed6f 100644
--- a/src/couch_replicator/README.md
+++ b/src/couch_replicator/README.md
@@ -183,11 +183,6 @@ A description of each child:
     healthy again. The next time it crashes its sequence of consecutive crashes
     will restart at 1.
 
- * `couch_replicator_scheduler_sup`: This module is a supervisor for running
-   replication tasks. The most interesting thing about it is perhaps that it is
-   not used to restart children. The scheduler handles restarts and error
-   handling backoffs.
-
  * `couch_replicator_doc_processor`: The doc processor component is in charge
    of processing replication document updates, turning them into replication
    jobs and adding those jobs to the scheduler. Unfortunately the only reason
diff --git a/src/couch_replicator/src/couch_replicator.app.src 
b/src/couch_replicator/src/couch_replicator.app.src
index 7cc0fd108..6b119bfa9 100644
--- a/src/couch_replicator/src/couch_replicator.app.src
+++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -21,7 +21,6 @@
         couch_replication,  % couch_replication_event gen_event
         couch_replicator_clustering,
         couch_replicator_scheduler,
-        couch_replicator_scheduler_sup,
         couch_replicator_doc_processor
     ]},
     {applications, [
diff --git a/src/couch_replicator/src/couch_replicator.hrl 
b/src/couch_replicator/src/couch_replicator.hrl
index 7c39c7c95..0648d66ae 100644
--- a/src/couch_replicator/src/couch_replicator.hrl
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -54,6 +54,5 @@
     id :: job_id() | '$1' | '_',
     rep :: #rep{} | '_',
     pid :: undefined | pid() | '$1' | '_',
-    monitor :: undefined | reference() | '_',
     history :: history() | '_'
 }).
diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl 
b/src/couch_replicator/src/couch_replicator_job_sup.erl
deleted file mode 100644
index e3d15c041..000000000
--- a/src/couch_replicator/src/couch_replicator_job_sup.erl
+++ /dev/null
@@ -1,34 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_job_sup).
-
--behaviour(supervisor).
-
--export([
-    init/1,
-    start_link/0
-]).
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-%%=============================================================================
-%% supervisor callbacks
-%%=============================================================================
-
-init([]) ->
-    {ok, {{one_for_one, 3, 10}, []}}.
-
-%%=============================================================================
-%% internal functions
-%%=============================================================================
diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl 
b/src/couch_replicator/src/couch_replicator_scheduler.erl
index aac5c342f..2296932eb 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler.erl
@@ -212,6 +212,7 @@ get_interval_msec() ->
 %% gen_server functions
 
 init(_) ->
+    process_flag(trap_exit, true),
     config:enable_feature('scheduler'),
     EtsOpts = [
         named_table,
@@ -316,7 +317,10 @@ handle_info(reschedule, State) ->
     erlang:cancel_timer(State#state.timer),
     Timer = erlang:send_after(State#state.interval, self(), reschedule),
     {noreply, State#state{timer = Timer}};
-handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
+handle_info({'EXIT', Pid, Reason}, #state{stats_pid = Pid} = State) ->
+    couch_log:error("~p : stats updater died: ~p", [?MODULE, Reason]),
+    {stop, {status_updater_died, Reason}, State};
+handle_info({'EXIT', Pid, normal}, State) ->
     {ok, Job} = job_by_pid(Pid),
     couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]),
     Interval = State#state.interval,
@@ -324,7 +328,7 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) ->
     remove_job_int(Job),
     update_running_jobs_stats(State#state.stats_pid),
     {noreply, State};
-handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) ->
+handle_info({'EXIT', Pid, Reason0}, State) ->
     {ok, Job} = job_by_pid(Pid),
     Reason =
         case Reason0 of
@@ -623,29 +627,18 @@ start_job_int(#job{pid = Pid}, _State) when Pid /= 
undefined ->
     ok;
 start_job_int(#job{} = Job0, State) ->
     Job = maybe_optimize_job_for_rate_limiting(Job0),
-    case couch_replicator_scheduler_sup:start_child(Job#job.rep) of
+    case couch_replicator_scheduler_job:start_link(Job#job.rep) of
         {ok, Child} ->
-            Ref = monitor(process, Child),
-            ok = update_state_started(Job, Child, Ref, State),
+            ok = update_state_started(Job, Child, State),
             couch_log:notice(
                 "~p: Job ~p started as ~p",
                 [?MODULE, Job#job.id, Child]
             );
-        {error, {already_started, OtherPid}} when node(OtherPid) =:= node() ->
-            Ref = monitor(process, OtherPid),
-            ok = update_state_started(Job, OtherPid, Ref, State),
-            couch_log:notice(
-                "~p: Job ~p already running as ~p. Most likely"
-                " because replicator scheduler was restarted",
-                [?MODULE, Job#job.id, OtherPid]
-            );
-        {error, {already_started, OtherPid}} when node(OtherPid) =/= node() ->
-            CrashMsg = "Duplicate replication running on another node",
-            couch_log:notice(
-                "~p: Job ~p already running as ~p. Most likely"
-                " because a duplicate replication is running on another node",
-                [?MODULE, Job#job.id, OtherPid]
-            ),
+        {error, {already_started, OtherPid}} ->
+            Node = node(OtherPid),
+            CrashMsg = "Duplicate replication running on " ++ 
atom_to_list(Node),
+            LogMsg = "~p: Job ~p already running as ~p on node ~s",
+            couch_log:warning(LogMsg, [?MODULE, Job#job.id, OtherPid, Node]),
             ok = update_state_crashed(Job, CrashMsg, State);
         {error, Reason} ->
             couch_log:notice(
@@ -658,14 +651,10 @@ start_job_int(#job{} = Job0, State) ->
 -spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}.
 stop_job_int(#job{pid = undefined}, _State) ->
     ok;
-stop_job_int(#job{} = Job, State) ->
-    ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid),
-    demonitor(Job#job.monitor, [flush]),
+stop_job_int(#job{pid = Pid} = Job, State) when is_pid(Pid) ->
+    ok = couch_replicator_scheduler_job:stop(Pid),
     ok = update_state_stopped(Job, State),
-    couch_log:notice(
-        "~p: Job ~p stopped as ~p",
-        [?MODULE, Job#job.id, Job#job.pid]
-    ).
+    couch_log:notice("~p: Job ~p stopped as ~p", [?MODULE, Job#job.id, Pid]).
 
 -spec remove_job_int(#job{}) -> true.
 remove_job_int(#job{} = Job) ->
@@ -704,15 +693,15 @@ job_by_id(Id) ->
 
 -spec update_state_stopped(#job{}, #state{}) -> ok.
 update_state_stopped(Job, State) ->
-    Job1 = reset_job_process(Job),
+    Job1 = Job#job{pid = undefined},
     Job2 = update_history(Job1, stopped, os:timestamp(), State),
     true = ets:insert(?MODULE, Job2),
     couch_stats:increment_counter([couch_replicator, jobs, stops]),
     ok.
 
--spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok.
-update_state_started(Job, Pid, Ref, State) ->
-    Job1 = set_job_process(Job, Pid, Ref),
+-spec update_state_started(#job{}, pid(), #state{}) -> ok.
+update_state_started(Job, Pid, State) when is_pid(Pid) ->
+    Job1 = Job#job{pid = Pid},
     Job2 = update_history(Job1, started, os:timestamp(), State),
     true = ets:insert(?MODULE, Job2),
     couch_stats:increment_counter([couch_replicator, jobs, starts]),
@@ -720,20 +709,12 @@ update_state_started(Job, Pid, Ref, State) ->
 
 -spec update_state_crashed(#job{}, any(), #state{}) -> ok.
 update_state_crashed(Job, Reason, State) ->
-    Job1 = reset_job_process(Job),
+    Job1 = Job#job{pid = undefined},
     Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State),
     true = ets:insert(?MODULE, Job2),
     couch_stats:increment_counter([couch_replicator, jobs, crashes]),
     ok.
 
--spec set_job_process(#job{}, pid(), reference()) -> #job{}.
-set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) ->
-    Job#job{pid = Pid, monitor = Ref}.
-
--spec reset_job_process(#job{}) -> #job{}.
-reset_job_process(#job{} = Job) ->
-    Job#job{pid = undefined, monitor = undefined}.
-
 -spec reschedule(#state{}) -> ok.
 reschedule(#state{interval = Interval} = State) ->
     couch_replicator_share:update(running_jobs(), Interval, os:timestamp()),
@@ -1253,7 +1234,7 @@ t_jobs_dont_churn_if_there_are_available_running_slots() 
->
         reschedule(mock_state(2, 2)),
         ?assertEqual({2, 0}, run_stop_count()),
         ?assertEqual([], jobs_stopped()),
-        ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, 
start_child, 1))
+        ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_job, 
start_link, 1))
     end).
 
 t_start_only_pending_jobs_do_not_churn_existing_ones() ->
@@ -1263,7 +1244,7 @@ t_start_only_pending_jobs_do_not_churn_existing_ones() ->
             continuous_running(2)
         ]),
         reschedule(mock_state(2, 2)),
-        ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_sup, 
start_child, 1)),
+        ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_job, 
start_link, 1)),
         ?assertEqual([], jobs_stopped()),
         ?assertEqual({2, 0}, run_stop_count())
     end).
@@ -1379,7 +1360,7 @@ t_if_transient_job_crashes_it_gets_removed() ->
         ?assertEqual(1, ets:info(?MODULE, size)),
         State = #state{max_history = 3, stats_pid = self()},
         {noreply, State} = handle_info(
-            {'DOWN', r1, process, Pid, failed},
+            {'EXIT', Pid, failed},
             State
         ),
         ?assertEqual(0, ets:info(?MODULE, size))
@@ -1403,7 +1384,7 @@ t_if_permanent_job_crashes_it_stays_in_ets() ->
             stats_pid = self()
         },
         {noreply, State} = handle_info(
-            {'DOWN', r1, process, Pid, failed},
+            {'EXIT', Pid, failed},
             State
         ),
         ?assertEqual(1, ets:info(?MODULE, size)),
@@ -1532,11 +1513,11 @@ setup_all() ->
     meck:expect(couch_log, notice, 2, ok),
     meck:expect(couch_log, warning, 2, ok),
     meck:expect(couch_log, error, 2, ok),
-    meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok),
+    meck:expect(couch_replicator_scheduler_job, stop, 1, ok),
     meck:expect(couch_stats, increment_counter, 1, ok),
     meck:expect(couch_stats, update_gauge, 2, ok),
     Pid = mock_pid(),
-    meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}),
+    meck:expect(couch_replicator_scheduler_job, start_link, 1, {ok, Pid}),
     couch_replicator_share:init().
 
 teardown_all(_) ->
@@ -1547,7 +1528,7 @@ teardown_all(_) ->
 setup() ->
     meck:reset([
         couch_log,
-        couch_replicator_scheduler_sup,
+        couch_replicator_scheduler_job,
         couch_stats,
         config
     ]).
@@ -1632,8 +1613,7 @@ continuous_running(Id) when is_integer(Id) ->
         id = Id,
         history = [started(Started), added()],
         rep = continuous_rep(),
-        pid = Pid,
-        monitor = monitor(process, Pid)
+        pid = Pid
     }.
 
 oneshot(Id) when is_integer(Id) ->
@@ -1648,8 +1628,7 @@ oneshot_running(Id) when is_integer(Id) ->
         id = Id,
         history = [started(Started), added()],
         rep = rep(),
-        pid = Pid,
-        monitor = monitor(process, Pid)
+        pid = Pid
     }.
 
 testjob(Hist) when is_list(Hist) ->
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index dab8f8a48..ad4656782 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -15,7 +15,8 @@
 -behaviour(gen_server).
 
 -export([
-    start_link/1
+    start_link/1,
+    stop/1
 ]).
 
 -export([
@@ -102,6 +103,19 @@ start_link(#rep{id = Id = {BaseId, Ext}, source = Src, 
target = Tgt} = Rep) ->
             {error, {already_started, OtherPid}}
     end.
 
+stop(Pid) when is_pid(Pid) ->
+    Ref = erlang:monitor(process, Pid),
+    unlink(Pid),
+    ok = gen_server:stop(Pid, shutdown, infinity),
+    receive
+        {'DOWN', Ref, _, _, Reason} -> Reason
+    end,
+    receive
+        {'EXIT', Pid, _} -> ok
+    after 0 -> ok
+    end,
+    ok.
+
 init(InitArgs) ->
     {ok, InitArgs, 0}.
 
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
deleted file mode 100644
index 1d5104312..000000000
--- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl
+++ /dev/null
@@ -1,59 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-%   http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_scheduler_sup).
-
--behaviour(supervisor).
-
-%% public api
--export([
-    start_link/0,
-    start_child/1,
-    terminate_child/1
-]).
-
-%% supervisor api
--export([
-    init/1
-]).
-
-%% includes
--include("couch_replicator.hrl").
-
-%% public functions
-
-start_link() ->
-    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-
-start_child(#rep{} = Rep) ->
-    supervisor:start_child(?MODULE, [Rep]).
-
-terminate_child(Pid) ->
-    supervisor:terminate_child(?MODULE, Pid).
-
-%% supervisor functions
-
-init(_Args) ->
-    Start = {couch_replicator_scheduler_job, start_link, []},
-    % A crashed job is not entitled to immediate restart.
-    Restart = temporary,
-    Shutdown = 5000,
-    Type = worker,
-    Modules = [couch_replicator_scheduler_job],
-
-    RestartStrategy = simple_one_for_one,
-    MaxR = 10,
-    MaxT = 3,
-
-    ChildSpec =
-        {undefined, Start, Restart, Shutdown, Type, Modules},
-    {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}.
diff --git a/src/couch_replicator/src/couch_replicator_sup.erl 
b/src/couch_replicator/src/couch_replicator_sup.erl
index d605b5c05..a8f2549d5 100644
--- a/src/couch_replicator/src/couch_replicator_sup.erl
+++ b/src/couch_replicator/src/couch_replicator_sup.erl
@@ -34,7 +34,6 @@ init(_Args) ->
         worker(couch_replicator_clustering),
         worker(couch_replicator_connection),
         worker(couch_replicator_rate_limiter),
-        sup(couch_replicator_scheduler_sup),
         worker(couch_replicator_scheduler),
         worker(couch_replicator_doc_processor),
         #{
@@ -48,6 +47,3 @@ init(_Args) ->
 
 worker(Mod) ->
     #{id => Mod, start => {Mod, start_link, []}}.
-
-sup(Mod) ->
-    #{id => Mod, type => supervisor, start => {Mod, start_link, []}, shutdown 
=> infinity}.

Reply via email to