This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch replace-kaboom
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 49580133ec7ac1a981023c62675303ff583ffa28
Author: Nick Vatamaniuc <vatam...@gmail.com>
AuthorDate: Mon Sep 18 11:52:17 2023 -0400

    Replace replicator open_doc_revs kaboom with something more descriptive
    
    Use something more descriptive, especially since it can bubble up the user 
level
    when the replication job crashes.
    
    In order to also log the error causing the failure, move the exit call to 
the
    main open_doc_revs body and the remove `retries=0` clause. However, avoid
    changing the exit value shape and keep it as an atom for now. There is a 
chance
    it might break error handling in the upper API layers in case when we match 
on
    error reason "shape".
    
    To test everything end-to-end, modify the test setup to create replication 
jobs
    with additional options, when previously it was only a choice between
    continuous or one-shot.
---
 .../src/couch_replicator_api_wrap.erl              | 33 +++++-----
 .../couch_replicator_error_reporting_tests.erl     | 72 +++++++++++++++-------
 2 files changed, 65 insertions(+), 40 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl 
b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index fbe9538ad..3dac826a0 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -277,14 +277,6 @@ bulk_get_zip({Id, Rev, _}, {[_ | _] = Props}) ->
     end.
 
 -spec open_doc_revs(#httpdb{}, binary(), list(), list(), function(), any()) -> 
no_return().
-open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
-    Path = encode_doc_id(Id),
-    QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | 
Options]),
-    Url = couch_util:url_strip_password(
-        couch_replicator_httpc:full_url(HttpDb, [{path, Path}, {qs, QS}])
-    ),
-    couch_log:error("Replication crashing because GET ~s failed", [Url]),
-    exit(kaboom);
 open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
     Path = encode_doc_id(Id),
     QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | 
Options]),
@@ -369,17 +361,20 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, 
Acc) ->
                 couch_replicator_httpc:full_url(HttpDb, [{path, Path}, {qs, 
QS}])
             ),
             #httpdb{retries = Retries, wait = Wait0} = HttpDb,
-            Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
-            couch_log:notice(
-                "Retrying GET to ~s in ~p seconds due to error ~w",
-                [Url, Wait / 1000, error_reason(Else)]
-            ),
-            ok = timer:sleep(Wait),
-            RetryDb = HttpDb#httpdb{
-                retries = Retries - 1,
-                wait = Wait
-            },
-            open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
+            NewRetries = Retries - 1,
+            case NewRetries > 0 of
+                true ->
+                    Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
+                    LogRetryMsg = "Retrying GET to ~s in ~p seconds due to 
error ~w",
+                    couch_log:notice(LogRetryMsg, [Url, Wait / 1000, 
error_reason(Else)]),
+                    ok = timer:sleep(Wait),
+                    RetryDb = HttpDb#httpdb{retries = NewRetries, wait = Wait},
+                    open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc);
+                false ->
+                    LogFailMsg = "Replication crashing because GET ~s failed. 
Error ~p",
+                    couch_log:error(LogFailMsg, [Url, error_reason(Else)]),
+                    exit(open_doc_revs_failed)
+            end
     end.
 
 error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index 30bc12c29..2441f0cd7 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -34,6 +34,7 @@ error_reporting_test_() ->
             ?TDEF_FE(t_skip_doc_put_invalid_attachment_name),
             ?TDEF_FE(t_fail_revs_diff),
             ?TDEF_FE(t_fail_bulk_get, 15),
+            ?TDEF_FE(t_fail_open_docs_get, 15),
             ?TDEF_FE(t_fail_changes_queue),
             ?TDEF_FE(t_fail_changes_manager),
             ?TDEF_FE(t_fail_changes_reader_proc),
@@ -120,7 +121,7 @@ t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) ->
     populate_db(Source, 6, 6, _WithAttachments = true),
     ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>],
     mock_fail_req(put, "/6", {ok, "401", [], ErrBody}),
-    {ok, RepId} = replicate(Source, Target, false),
+    {ok, RepId} = replicate(Source, Target, #{continuous => false}),
     {ok, Listener} = rep_result_listener(RepId),
     Res = wait_rep_result(RepId),
     % Replication job should succeed
@@ -140,7 +141,7 @@ t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) ->
     populate_db(Source, 6, 6, _WithAttachments = true),
     ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>],
     mock_fail_req(put, "/6", {ok, "403", [], ErrBody}),
-    {ok, RepId} = replicate(Source, Target, false),
+    {ok, RepId} = replicate(Source, Target, #{continuous => false}),
     {ok, Listener} = rep_result_listener(RepId),
     Res = wait_rep_result(RepId),
     % Replication job should succeed
@@ -160,7 +161,7 @@ t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) ->
     populate_db(Source, 6, 6, _WithAttachments = true),
     ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>],
     mock_fail_req(put, "/6", {ok, "413", [], ErrBody}),
-    {ok, RepId} = replicate(Source, Target, false),
+    {ok, RepId} = replicate(Source, Target, #{continuous => false}),
     {ok, Listener} = rep_result_listener(RepId),
     Res = wait_rep_result(RepId),
     % Replication job should succeed
@@ -180,7 +181,7 @@ t_skip_doc_put_415_errors({_Ctx, {Source, Target}}) ->
     populate_db(Source, 6, 6, _WithAttachments = true),
     ErrBody = [<<"{\"error\":\"unsupported_media_type\", 
\"reason\":\"bad_media\"}">>],
     mock_fail_req(put, "/6", {ok, "415", [], ErrBody}),
-    {ok, RepId} = replicate(Source, Target, false),
+    {ok, RepId} = replicate(Source, Target, #{continuous => false}),
     {ok, Listener} = rep_result_listener(RepId),
     Res = wait_rep_result(RepId),
     % Replication job should succeed
@@ -202,7 +203,7 @@ t_skip_doc_put_invalid_attachment_name({_Ctx, {Source, 
Target}}) ->
         <<"{\"error\":\"bad_request\", \"reason\":\"Attachment name '_foo' 
starts with prohibited character '_'\"}">>
     ],
     mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
-    {ok, RepId} = replicate(Source, Target, false),
+    {ok, RepId} = replicate(Source, Target, #{continuous => false}),
     {ok, Listener} = rep_result_listener(RepId),
     Res = wait_rep_result(RepId),
     % Replication job should succeed
@@ -256,6 +257,33 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     % Check that there was a falback to a plain GET
     ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 
6)).
 
+t_fail_open_docs_get({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    Opts = #{
+        % We're testing the case of individual doc rev GETs
+        use_bulk_get => false,
+        % Perform at least one retry before giving up (for extra coverage)
+        retries_per_request => 2
+    },
+    {ok, RepId} = replicate(Source, Target, Opts),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    % Break open_doc_revs on the server side and see what happens
+    meck:new(fabric_doc_open_revs, [passthrough]),
+    meck:expect(fabric_doc_open_revs, go, fun
+        (Source, <<"6">>, _, _) ->
+            % This is a random error, no particular reason for a 404
+            meck:exception(throw, not_found);
+        (ArgDb, ArgDocId, ArgRevs, ArgOpts) ->
+            meck:passthrough([ArgDb, ArgDocId, ArgRevs, ArgOpts])
+    end),
+    populate_db(Source, 6, 6),
+    {error, Result} = wait_rep_result(RepId),
+    ?assertMatch({worker_died, _, {process_died, _, open_doc_revs_failed}}, 
Result),
+    ?assert(meck:num_calls(fabric_doc_open_revs, go, 4) >= 2),
+    couch_replicator_notifier:stop(Listener).
+
 t_fail_changes_queue({_Ctx, {Source, Target}}) ->
     populate_db(Source, 1, 5),
     {ok, RepId} = replicate(Source, Target),
@@ -311,7 +339,7 @@ t_dont_start_duplicate_job({_Ctx, {Source, Target}}) ->
     meck:new(couch_replicator_pg, [passthrough]),
     Pid = pid_from_another_node(),
     meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end),
-    Rep = make_rep(Source, Target, true),
+    Rep = make_rep(Source, Target, #{continuous => true}),
     ExpectErr = {error, {already_started, Pid}},
     ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)).
 
@@ -444,26 +472,28 @@ wait_target_in_sync_loop(DocCount, TargetName, 
RetriesLeft) ->
     end.
 
 replicate(Source, Target) ->
-    replicate(Source, Target, true).
+    replicate(Source, Target, #{}).
 
-replicate(Source, Target, Continuous) ->
-    Rep = make_rep(Source, Target, Continuous),
+replicate(Source, Target, #{} = Opts) ->
+    Rep = make_rep(Source, Target, Opts),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
 
-make_rep(Source, Target, Continuous) ->
-    RepObject =
-        {[
-            {<<"source">>, url(Source)},
-            {<<"target">>, url(Target)},
-            {<<"continuous">>, Continuous},
-            {<<"worker_processes">>, 1},
-            {<<"retries_per_request">>, 1},
-            % Low connection timeout so _changes feed gets restarted quicker
-            {<<"connection_timeout">>, 3000}
-        ]},
-    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepObject, ?ADMIN_USER),
+make_rep(Source, Target, #{} = OverrideOpts) ->
+    Opts0 = #{
+        source => url(Source),
+        target => url(Target),
+        continuous => true,
+        worker_processes => 1,
+        retries_per_request => 1,
+        % Low connection timeout so _changes feed gets restarted quicker
+        connection_timeout => 3000
+    },
+    RepMap = maps:merge(Opts0, OverrideOpts),
+    % parse_rep_doc accepts {[...]} ejson format
+    RepEJson = couch_util:json_decode(couch_util:json_encode(RepMap)),
+    {ok, Rep} = couch_replicator_parse:parse_rep_doc(RepEJson, ?ADMIN_USER),
     Rep.
 
 url(DbName) ->

Reply via email to