This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch fix-replicator-reschedule in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 6639b9f02b79530cfaf14d7867cacf3971ffaab4 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Tue Aug 13 16:53:44 2019 -0400 Fix replication rescheduling Running < MaxJobs corner case Previously, when total number of replication jobs exceed `MaxJobs`, if some jobs crashed, additional jobs didn't start immediately to bring the running total up to the `MaxJobs` limit. Then, during rescheduling, the `Running == MaxJobs, Pending > 0` guard would fail and jobs would not rotate. In other words, if at least one job crashed, rotation didn't happen. The fix is to simplify the rotation logic to handle the `Running < MaxJobs` case. First, up to `Churn` number of jobs are stopped, then enough jobs are started to reach the MaxJobs limit. This case also handles the `start_pending_jobs/3` logic so there is no need to call that before rotation happens. --- .../src/couch_replicator_scheduler.erl | 70 ++++++++++++++-------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_scheduler.erl b/src/couch_replicator/src/couch_replicator_scheduler.erl index e3dbede..e0bb10d 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler.erl @@ -723,35 +723,30 @@ reset_job_process(#job{} = Job) -> -spec reschedule(#state{}) -> ok. reschedule(State) -> - Running = running_job_count(), - Pending = pending_job_count(), - stop_excess_jobs(State, Running), - start_pending_jobs(State, Running, Pending), - rotate_jobs(State, Running, Pending), - update_running_jobs_stats(State#state.stats_pid), - ok. + StopCount = stop_excess_jobs(State, running_job_count()), + rotate_jobs(State, StopCount), + update_running_jobs_stats(State#state.stats_pid). --spec stop_excess_jobs(#state{}, non_neg_integer()) -> ok. +-spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer(). stop_excess_jobs(State, Running) -> #state{max_jobs=MaxJobs} = State, - StopCount = Running - MaxJobs, - if StopCount =< 0 -> ok; true -> + StopCount = max(0, Running - MaxJobs), + if StopCount == 0 -> ok; true -> Stopped = stop_jobs(StopCount, true, State), OneshotLeft = StopCount - Stopped, if OneshotLeft =< 0 -> ok; true -> stop_jobs(OneshotLeft, false, State), ok end - end. + end, + StopCount. start_pending_jobs(State) -> - start_pending_jobs(State, running_job_count(), pending_job_count()). - - -start_pending_jobs(State, Running, Pending) -> #state{max_jobs=MaxJobs} = State, + Running = running_job_count(), + Pending = pending_job_count(), if Running < MaxJobs, Pending > 0 -> start_jobs(MaxJobs - Running, State); true -> @@ -759,13 +754,19 @@ start_pending_jobs(State, Running, Pending) -> end. --spec rotate_jobs(#state{}, non_neg_integer(), non_neg_integer()) -> ok. -rotate_jobs(State, Running, Pending) -> +-spec rotate_jobs(#state{}, non_neg_integer()) -> ok. +rotate_jobs(State, ChurnSoFar) -> #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State, - if Running == MaxJobs, Pending > 0 -> - RotateCount = lists:min([Pending, Running, MaxChurn]), - StopCount = stop_jobs(RotateCount, true, State), - start_jobs(StopCount, State); + Running = running_job_count(), + Pending = pending_job_count(), + % Reduce MaxChurn by the number of already stopped jobs in the + % current rescheduling cycle. + Churn = max(0, MaxChurn - ChurnSoFar), + if Running =< MaxJobs, Pending > 0 -> + StopCount = lists:min([Pending, Running, Churn]), + stop_jobs(StopCount, true, State), + StartCount = max(0, MaxJobs - running_job_count()), + start_jobs(StartCount, State); true -> ok end. @@ -1047,6 +1048,7 @@ scheduler_test_() -> t_excess_prefer_continuous_first(), t_stop_oldest_first(), t_start_oldest_first(), + t_jobs_churn_even_if_not_all_max_jobs_are_running(), t_dont_stop_if_nothing_pending(), t_max_churn_limits_number_of_rotated_jobs(), t_existing_jobs(), @@ -1056,7 +1058,7 @@ scheduler_test_() -> t_rotate_continuous_only_if_mixed(), t_oneshot_dont_get_starting_priority(), t_oneshot_will_hog_the_scheduler(), - t_if_excess_is_trimmed_rotation_doesnt_happen(), + t_if_excess_is_trimmed_rotation_still_happens(), t_if_transient_job_crashes_it_gets_removed(), t_if_permanent_job_crashes_it_stays_in_ets(), t_job_summary_running(), @@ -1177,10 +1179,10 @@ t_stop_oldest_first() -> continuous_running(5) ], setup_jobs(Jobs), - reschedule(mock_state(2)), + reschedule(mock_state(2, 1)), ?assertEqual({2, 1}, run_stop_count()), ?assertEqual([4], jobs_stopped()), - reschedule(mock_state(1)), + reschedule(mock_state(1, 1)), ?assertEqual([7], jobs_running()) end). @@ -1192,6 +1194,22 @@ t_start_oldest_first() -> ?assertEqual({1, 2}, run_stop_count()), ?assertEqual([2], jobs_running()), reschedule(mock_state(2)), + ?assertEqual({2, 1}, run_stop_count()), + % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should + % be running. + ?assertEqual([2], jobs_stopped()) + end). + + +t_jobs_churn_even_if_not_all_max_jobs_are_running() -> + ?_test(begin + setup_jobs([ + continuous_running(7), + continuous(2), + continuous(5) + ]), + reschedule(mock_state(2, 2)), + ?assertEqual({2, 1}, run_stop_count()), ?assertEqual([7], jobs_stopped()) end). @@ -1289,7 +1307,7 @@ t_oneshot_will_hog_the_scheduler() -> end). -t_if_excess_is_trimmed_rotation_doesnt_happen() -> +t_if_excess_is_trimmed_rotation_still_happens() -> ?_test(begin Jobs = [ continuous(1), @@ -1298,7 +1316,7 @@ t_if_excess_is_trimmed_rotation_doesnt_happen() -> ], setup_jobs(Jobs), reschedule(mock_state(1)), - ?assertEqual([3], jobs_running()) + ?assertEqual([1], jobs_running()) end).
