This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch prevent-local-duplicate-jobs-error in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 770faed94091325cfccb9ffbe7a4bf75388a3b46 Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Wed Apr 24 00:35:56 2024 -0400 Remove replication job supervisor Use the scheduler as the job supervisor, since the scheduler is already a fancy supervisor, with its own backoff logic, process monitoring, etc. This simplifies the job starting/stopping logic and fixes a bug where the simple_one_for_one supervisor could restart a job, but the scheduler would consider it not running, and try to start another job with the same replication ID on the same node. Since jobs register themselves in pg, the second job would keep crashing with duplicate_job error the first time it tried to checkpoint. --- src/couch_replicator/README.md | 5 -- src/couch_replicator/src/couch_replicator.app.src | 1 - src/couch_replicator/src/couch_replicator.hrl | 1 - .../src/couch_replicator_job_sup.erl | 34 --------- .../src/couch_replicator_scheduler.erl | 81 ++++++++-------------- .../src/couch_replicator_scheduler_job.erl | 16 ++++- .../src/couch_replicator_scheduler_sup.erl | 59 ---------------- src/couch_replicator/src/couch_replicator_sup.erl | 4 -- 8 files changed, 45 insertions(+), 156 deletions(-) diff --git a/src/couch_replicator/README.md b/src/couch_replicator/README.md index 6a2a5cfdd..b3687ed6f 100644 --- a/src/couch_replicator/README.md +++ b/src/couch_replicator/README.md @@ -183,11 +183,6 @@ A description of each child: healthy again. The next time it crashes its sequence of consecutive crashes will restart at 1. - * `couch_replicator_scheduler_sup`: This module is a supervisor for running - replication tasks. The most interesting thing about it is perhaps that it is - not used to restart children. The scheduler handles restarts and error - handling backoffs. - * `couch_replicator_doc_processor`: The doc processor component is in charge of processing replication document updates, turning them into replication jobs and adding those jobs to the scheduler. Unfortunately the only reason diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src index 7cc0fd108..6b119bfa9 100644 --- a/src/couch_replicator/src/couch_replicator.app.src +++ b/src/couch_replicator/src/couch_replicator.app.src @@ -21,7 +21,6 @@ couch_replication, % couch_replication_event gen_event couch_replicator_clustering, couch_replicator_scheduler, - couch_replicator_scheduler_sup, couch_replicator_doc_processor ]}, {applications, [ diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl index 7c39c7c95..0648d66ae 100644 --- a/src/couch_replicator/src/couch_replicator.hrl +++ b/src/couch_replicator/src/couch_replicator.hrl @@ -54,6 +54,5 @@ id :: job_id() | '$1' | '_', rep :: #rep{} | '_', pid :: undefined | pid() | '$1' | '_', - monitor :: undefined | reference() | '_', history :: history() | '_' }). diff --git a/src/couch_replicator/src/couch_replicator_job_sup.erl b/src/couch_replicator/src/couch_replicator_job_sup.erl deleted file mode 100644 index e3d15c041..000000000 --- a/src/couch_replicator/src/couch_replicator_job_sup.erl +++ /dev/null @@ -1,34 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_job_sup). - --behaviour(supervisor). - --export([ - init/1, - start_link/0 -]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%%============================================================================= -%% supervisor callbacks -%%============================================================================= - -init([]) -> - {ok, {{one_for_one, 3, 10}, []}}. - -%%============================================================================= -%% internal functions -%%============================================================================= diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index aac5c342f..2296932eb 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -212,6 +212,7 @@ get_interval_msec() -> %% gen_server functions init(_) -> + process_flag(trap_exit, true), config:enable_feature('scheduler'), EtsOpts = [ named_table, @@ -316,7 +317,10 @@ handle_info(reschedule, State) -> erlang:cancel_timer(State#state.timer), Timer = erlang:send_after(State#state.interval, self(), reschedule), {noreply, State#state{timer = Timer}}; -handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> +handle_info({'EXIT', Pid, Reason}, #state{stats_pid = Pid} = State) -> + couch_log:error("~p : stats updater died: ~p", [?MODULE, Reason]), + {stop, {status_updater_died, Reason}, State}; +handle_info({'EXIT', Pid, normal}, State) -> {ok, Job} = job_by_pid(Pid), couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), Interval = State#state.interval, @@ -324,7 +328,7 @@ handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> remove_job_int(Job), update_running_jobs_stats(State#state.stats_pid), {noreply, State}; -handle_info({'DOWN', _Ref, process, Pid, Reason0}, State) -> +handle_info({'EXIT', Pid, Reason0}, State) -> {ok, Job} = job_by_pid(Pid), Reason = case Reason0 of @@ -623,29 +627,18 @@ start_job_int(#job{pid = Pid}, _State) when Pid /= undefined -> ok; start_job_int(#job{} = Job0, State) -> Job = maybe_optimize_job_for_rate_limiting(Job0), - case couch_replicator_scheduler_sup:start_child(Job#job.rep) of + case couch_replicator_scheduler_job:start_link(Job#job.rep) of {ok, Child} -> - Ref = monitor(process, Child), - ok = update_state_started(Job, Child, Ref, State), + ok = update_state_started(Job, Child, State), couch_log:notice( "~p: Job ~p started as ~p", [?MODULE, Job#job.id, Child] ); - {error, {already_started, OtherPid}} when node(OtherPid) =:= node() -> - Ref = monitor(process, OtherPid), - ok = update_state_started(Job, OtherPid, Ref, State), - couch_log:notice( - "~p: Job ~p already running as ~p. Most likely" - " because replicator scheduler was restarted", - [?MODULE, Job#job.id, OtherPid] - ); - {error, {already_started, OtherPid}} when node(OtherPid) =/= node() -> - CrashMsg = "Duplicate replication running on another node", - couch_log:notice( - "~p: Job ~p already running as ~p. Most likely" - " because a duplicate replication is running on another node", - [?MODULE, Job#job.id, OtherPid] - ), + {error, {already_started, OtherPid}} -> + Node = node(OtherPid), + CrashMsg = "Duplicate replication running on " ++ atom_to_list(Node), + LogMsg = "~p: Job ~p already running as ~p on node ~s", + couch_log:warning(LogMsg, [?MODULE, Job#job.id, OtherPid, Node]), ok = update_state_crashed(Job, CrashMsg, State); {error, Reason} -> couch_log:notice( @@ -658,14 +651,10 @@ start_job_int(#job{} = Job0, State) -> -spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}. stop_job_int(#job{pid = undefined}, _State) -> ok; -stop_job_int(#job{} = Job, State) -> - ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid), - demonitor(Job#job.monitor, [flush]), +stop_job_int(#job{pid = Pid} = Job, State) when is_pid(Pid) -> + ok = couch_replicator_scheduler_job:stop(Pid), ok = update_state_stopped(Job, State), - couch_log:notice( - "~p: Job ~p stopped as ~p", - [?MODULE, Job#job.id, Job#job.pid] - ). + couch_log:notice("~p: Job ~p stopped as ~p", [?MODULE, Job#job.id, Pid]). -spec remove_job_int(#job{}) -> true. remove_job_int(#job{} = Job) -> @@ -704,15 +693,15 @@ job_by_id(Id) -> -spec update_state_stopped(#job{}, #state{}) -> ok. update_state_stopped(Job, State) -> - Job1 = reset_job_process(Job), + Job1 = Job#job{pid = undefined}, Job2 = update_history(Job1, stopped, os:timestamp(), State), true = ets:insert(?MODULE, Job2), couch_stats:increment_counter([couch_replicator, jobs, stops]), ok. --spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok. -update_state_started(Job, Pid, Ref, State) -> - Job1 = set_job_process(Job, Pid, Ref), +-spec update_state_started(#job{}, pid(), #state{}) -> ok. +update_state_started(Job, Pid, State) when is_pid(Pid) -> + Job1 = Job#job{pid = Pid}, Job2 = update_history(Job1, started, os:timestamp(), State), true = ets:insert(?MODULE, Job2), couch_stats:increment_counter([couch_replicator, jobs, starts]), @@ -720,20 +709,12 @@ update_state_started(Job, Pid, Ref, State) -> -spec update_state_crashed(#job{}, any(), #state{}) -> ok. update_state_crashed(Job, Reason, State) -> - Job1 = reset_job_process(Job), + Job1 = Job#job{pid = undefined}, Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State), true = ets:insert(?MODULE, Job2), couch_stats:increment_counter([couch_replicator, jobs, crashes]), ok. --spec set_job_process(#job{}, pid(), reference()) -> #job{}. -set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) -> - Job#job{pid = Pid, monitor = Ref}. - --spec reset_job_process(#job{}) -> #job{}. -reset_job_process(#job{} = Job) -> - Job#job{pid = undefined, monitor = undefined}. - -spec reschedule(#state{}) -> ok. reschedule(#state{interval = Interval} = State) -> couch_replicator_share:update(running_jobs(), Interval, os:timestamp()), @@ -1253,7 +1234,7 @@ t_jobs_dont_churn_if_there_are_available_running_slots() -> reschedule(mock_state(2, 2)), ?assertEqual({2, 0}, run_stop_count()), ?assertEqual([], jobs_stopped()), - ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)) + ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_job, start_link, 1)) end). t_start_only_pending_jobs_do_not_churn_existing_ones() -> @@ -1263,7 +1244,7 @@ t_start_only_pending_jobs_do_not_churn_existing_ones() -> continuous_running(2) ]), reschedule(mock_state(2, 2)), - ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)), + ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_job, start_link, 1)), ?assertEqual([], jobs_stopped()), ?assertEqual({2, 0}, run_stop_count()) end). @@ -1379,7 +1360,7 @@ t_if_transient_job_crashes_it_gets_removed() -> ?assertEqual(1, ets:info(?MODULE, size)), State = #state{max_history = 3, stats_pid = self()}, {noreply, State} = handle_info( - {'DOWN', r1, process, Pid, failed}, + {'EXIT', Pid, failed}, State ), ?assertEqual(0, ets:info(?MODULE, size)) @@ -1403,7 +1384,7 @@ t_if_permanent_job_crashes_it_stays_in_ets() -> stats_pid = self() }, {noreply, State} = handle_info( - {'DOWN', r1, process, Pid, failed}, + {'EXIT', Pid, failed}, State ), ?assertEqual(1, ets:info(?MODULE, size)), @@ -1532,11 +1513,11 @@ setup_all() -> meck:expect(couch_log, notice, 2, ok), meck:expect(couch_log, warning, 2, ok), meck:expect(couch_log, error, 2, ok), - meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok), + meck:expect(couch_replicator_scheduler_job, stop, 1, ok), meck:expect(couch_stats, increment_counter, 1, ok), meck:expect(couch_stats, update_gauge, 2, ok), Pid = mock_pid(), - meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}), + meck:expect(couch_replicator_scheduler_job, start_link, 1, {ok, Pid}), couch_replicator_share:init(). teardown_all(_) -> @@ -1547,7 +1528,7 @@ teardown_all(_) -> setup() -> meck:reset([ couch_log, - couch_replicator_scheduler_sup, + couch_replicator_scheduler_job, couch_stats, config ]). @@ -1632,8 +1613,7 @@ continuous_running(Id) when is_integer(Id) -> id = Id, history = [started(Started), added()], rep = continuous_rep(), - pid = Pid, - monitor = monitor(process, Pid) + pid = Pid }. oneshot(Id) when is_integer(Id) -> @@ -1648,8 +1628,7 @@ oneshot_running(Id) when is_integer(Id) -> id = Id, history = [started(Started), added()], rep = rep(), - pid = Pid, - monitor = monitor(process, Pid) + pid = Pid }. testjob(Hist) when is_list(Hist) -> diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index dab8f8a48..ad4656782 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -15,7 +15,8 @@ -behaviour(gen_server). -export([ - start_link/1 + start_link/1, + stop/1 ]). -export([ @@ -102,6 +103,19 @@ start_link(#rep{id = Id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) -> {error, {already_started, OtherPid}} end. +stop(Pid) when is_pid(Pid) -> + Ref = erlang:monitor(process, Pid), + unlink(Pid), + ok = gen_server:stop(Pid, shutdown, infinity), + receive + {'DOWN', Ref, _, _, Reason} -> Reason + end, + receive + {'EXIT', Pid, _} -> ok + after 0 -> ok + end, + ok. + init(InitArgs) -> {ok, InitArgs, 0}. diff --git a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl b/src/couch_replicator/src/couch_replicator_scheduler_sup.erl deleted file mode 100644 index 1d5104312..000000000 --- a/src/couch_replicator/src/couch_replicator_scheduler_sup.erl +++ /dev/null @@ -1,59 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_replicator_scheduler_sup). - --behaviour(supervisor). - -%% public api --export([ - start_link/0, - start_child/1, - terminate_child/1 -]). - -%% supervisor api --export([ - init/1 -]). - -%% includes --include("couch_replicator.hrl"). - -%% public functions - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -start_child(#rep{} = Rep) -> - supervisor:start_child(?MODULE, [Rep]). - -terminate_child(Pid) -> - supervisor:terminate_child(?MODULE, Pid). - -%% supervisor functions - -init(_Args) -> - Start = {couch_replicator_scheduler_job, start_link, []}, - % A crashed job is not entitled to immediate restart. - Restart = temporary, - Shutdown = 5000, - Type = worker, - Modules = [couch_replicator_scheduler_job], - - RestartStrategy = simple_one_for_one, - MaxR = 10, - MaxT = 3, - - ChildSpec = - {undefined, Start, Restart, Shutdown, Type, Modules}, - {ok, {{RestartStrategy, MaxR, MaxT}, [ChildSpec]}}. diff --git a/src/couch_replicator/src/couch_replicator_sup.erl b/src/couch_replicator/src/couch_replicator_sup.erl index d605b5c05..a8f2549d5 100644 --- a/src/couch_replicator/src/couch_replicator_sup.erl +++ b/src/couch_replicator/src/couch_replicator_sup.erl @@ -34,7 +34,6 @@ init(_Args) -> worker(couch_replicator_clustering), worker(couch_replicator_connection), worker(couch_replicator_rate_limiter), - sup(couch_replicator_scheduler_sup), worker(couch_replicator_scheduler), worker(couch_replicator_doc_processor), #{ @@ -48,6 +47,3 @@ init(_Args) -> worker(Mod) -> #{id => Mod, start => {Mod, start_link, []}}. - -sup(Mod) -> - #{id => Mod, type => supervisor, start => {Mod, start_link, []}, shutdown => infinity}.