This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch replicator-skip-explicit-4xx-errors-only
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4bcca6bc8c5983ae38fdd78a239eac69282e9f0e
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Jul 13 14:02:55 2023 -0400

    Improve replicator handling of 4xx error on individual doc PUTs.
    
    Documents with attachments are written as individual HTTP PUT multipart
    requests. If a document PUT request fails with a 401, 403 or 413 error, the
    expectation is that the replicator would skip over that document, increment 
the
    `doc_write_failures` metric and continue replicating and checkpointing. That
    behavior is by design. However, previously, before this commit, documents 
were
    also skipped in case of any unknown 4xx error (except a 408 which was 
treated as a
    timeout and retried). That behavior is a bit unexpected as it's unclear what
    the reson for the 4xx error might be.
    
    Issue: https://github.com/apache/couchdb/issues/4676
---
 .../src/couch_replicator_api_wrap.erl              |  18 ++-
 .../src/couch_replicator_worker.erl                |  94 +++++++-----
 .../couch_replicator_error_reporting_tests.erl     | 160 +++++++++++++++++++--
 3 files changed, 218 insertions(+), 54 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl 
b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index a44a79da1..4a2057773 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -343,6 +343,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, 
Acc) ->
             throw(Stub);
         {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} 
->
             exit(max_backoff);
+        {'DOWN', Ref, process, Pid, {doc_write_failed, _} = Error} ->
+            exit(Error);
         {'DOWN', Ref, process, Pid, request_uri_too_long} ->
             NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2,
             case NewMaxLen < ?MIN_URL_LEN of
@@ -451,17 +453,21 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, 
Options, Type) ->
             (409, _, _) ->
                 throw(conflict);
             (Code, _, {Props}) ->
-                case {Code, get_value(<<"error">>, Props)} of
+                Error = get_value(<<"error">>, Props),
+                Reason = get_value(<<"reason">>, Props),
+                case {Code, Error} of
                     {401, <<"unauthorized">>} ->
-                        throw({unauthorized, get_value(<<"reason">>, Props)});
+                        throw({unauthorized, Reason});
                     {403, <<"forbidden">>} ->
-                        throw({forbidden, get_value(<<"reason">>, Props)});
+                        throw({forbidden, Reason});
                     {412, <<"missing_stub">>} ->
-                        throw({missing_stub, get_value(<<"reason">>, Props)});
+                        throw({missing_stub, Reason});
                     {413, _} ->
                         {error, request_body_too_large};
-                    {_, Error} ->
-                        {error, Error}
+                    {_, undefined} ->
+                        {error, {Code, Props}};
+                    {_, _} ->
+                        {error, {Error, Reason}}
                 end
         end
     ).
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl 
b/src/couch_replicator/src/couch_replicator_worker.erl
index 46e4a6e94..ab56c34af 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -238,6 +238,8 @@ handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, 
State) ->
     {stop, {shutdown, Err}, State};
 handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
     {stop, {shutdown, Err}, State};
+handle_info({'EXIT', _Pid, {doc_write_failed, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
 handle_info({'EXIT', Pid, Reason}, State) ->
     {stop, {process_died, Pid, Reason}, State}.
 
@@ -583,44 +585,68 @@ handle_flush_docs_result({error, {bulk_docs_failed, _, _} 
= Err}, _, _) ->
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_replicator_api_wrap:update_doc(Target, Doc, [], 
?REPLICATED_CHANGES) of
-        {ok, _} ->
-            ok;
-        Error ->
-            couch_log:error(
-                "Replicator: error writing document `~s` to `~s`: ~s",
-                [Id, couch_replicator_api_wrap:db_uri(Target), 
couch_util:to_binary(Error)]
-            ),
-            Error
+        {ok, _} -> ok;
+        Error -> handle_doc_write_error(Error, Target, Id, Pos, RevId)
     catch
-        throw:{missing_stub, _} = MissingStub ->
-            throw(MissingStub);
         throw:{Error, Reason} ->
-            couch_log:error(
-                "Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`, reason: `~s`.",
-                [
-                    Id,
-                    couch_doc:rev_to_str({Pos, RevId}),
-                    couch_replicator_api_wrap:db_uri(Target),
-                    to_binary(Error),
-                    to_binary(Reason)
-                ]
-            ),
-            {error, Error};
-        throw:Err ->
-            couch_log:error(
-                "Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`.",
-                [
-                    Id,
-                    couch_doc:rev_to_str({Pos, RevId}),
-                    couch_replicator_api_wrap:db_uri(Target),
-                    to_binary(Err)
-                ]
-            ),
-            {error, Err}
+            handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId)
     end.
 
+% In most cases we fail the replication job by re-throwing the error.
+% The only exceptions are
+%  401 : unauthorized
+%  403 : forbidden
+%  413 : request_body_too_large
+%
+handle_doc_write_error({missing_stub, _} = MissingStub, _, _, _, _) ->
+    throw(MissingStub);
+handle_doc_write_error({error, request_body_too_large} = Error, Target, Id, _, 
_) ->
+    couch_log:error(
+        "Replicator: skipping writing document `~s` to `~s` : 
request_body_too_large.",
+        [Id, couch_replicator_api_wrap:db_uri(Target)]
+    ),
+    Error;
+handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) when
+    Error == unauthorized orelse Error == forbidden
+->
+    couch_log:error(
+        "Replicator: skipping writing document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`, reason: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error),
+            to_binary(Reason)
+        ]
+    ),
+    {error, Error};
+handle_doc_write_error({error, {Error, Reason}}, Target, Id, Pos, RevId) ->
+    couch_log:error(
+        "Replicator: error writting document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`, reason: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error),
+            to_binary(Reason)
+        ]
+    ),
+    exit({doc_write_failed, {Error, Reason}});
+handle_doc_write_error(Error, Target, Id, Pos, RevId) ->
+    couch_log:error(
+        "Replicator: error writting document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error)
+        ]
+    ),
+    exit({doc_write_failed, Error}).
+
 find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) ->
     {IdRevs, AllCount} = lists:foldr(
         fun
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index d9c6a1048..d71cdda4d 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -24,6 +24,12 @@ error_reporting_test_() ->
         [
             ?TDEF_FE(t_fail_bulk_docs),
             ?TDEF_FE(t_fail_changes_reader),
+            ?TDEF_FE(t_fail_doc_put_4xx_well_formed_json_error),
+            ?TDEF_FE(t_fail_doc_put_4xx_unexpected_json_error),
+            ?TDEF_FE(t_fail_doc_put_4xx_invalid_json_error),
+            ?TDEF_FE(t_skip_doc_put_401_errors),
+            ?TDEF_FE(t_skip_doc_put_403_errors),
+            ?TDEF_FE(t_skip_doc_put_413_errors),
             ?TDEF_FE(t_fail_revs_diff),
             ?TDEF_FE(t_fail_bulk_get, 15),
             ?TDEF_FE(t_fail_changes_queue),
@@ -41,7 +47,7 @@ t_fail_bulk_docs({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -55,7 +61,7 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(get, "/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -63,13 +69,117 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) ->
 
     couch_replicator_notifier:stop(Listener).
 
+t_fail_doc_put_4xx_well_formed_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    ErrBody = [<<"{\"error\":\"x\", \"reason\":\"y\"}">>],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertEqual({doc_write_failed, {<<"x">>, <<"y">>}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_fail_doc_put_4xx_unexpected_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    ErrBody = [<<"{\"a\":\"b\"}">>],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertEqual({doc_write_failed, {400, [{<<"a">>, <<"b">>}]}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_fail_doc_put_4xx_invalid_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    mock_fail_req(put, "/6", {ok, "400", [], [<<"potato">>]}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertMatch({doc_write_failed, {invalid_json, _}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>],
+    mock_fail_req(put, "/6", {ok, "401", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>],
+    mock_fail_req(put, "/6", {ok, "403", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>],
+    mock_fail_req(put, "/6", {ok, "413", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
 t_fail_revs_diff({_Ctx, {Source, Target}}) ->
     populate_db(Source, 1, 5),
     {ok, RepId} = replicate(Source, Target),
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -87,7 +197,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     % Tolerate a 500 error
-    mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}),
+    mock_fail_req(post, "/_bulk_get", {ok, "501", [], 
[<<"not_implemented">>]}),
     meck:reset(couch_replicator_api_wrap),
     populate_db(Source, 6, 6),
     wait_target_in_sync(Source, Target),
@@ -95,7 +205,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 
6)),
 
     % Tolerate a 400 error
-    mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
     meck:reset(couch_replicator_api_wrap),
     populate_db(Source, 7, 7),
     wait_target_in_sync(Source, Target),
@@ -157,7 +267,7 @@ t_dont_start_duplicate_job({_Ctx, {Source, Target}}) ->
     meck:new(couch_replicator_pg, [passthrough]),
     Pid = pid_from_another_node(),
     meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end),
-    Rep = make_rep(Source, Target),
+    Rep = make_rep(Source, Target, true),
     ExpectErr = {error, {already_started, Pid}},
     ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)).
 
@@ -205,16 +315,19 @@ pid_from_another_node() ->
     ?assertEqual('A@1', node(Pid)),
     Pid.
 
-mock_fail_req(Path, Return) ->
+mock_fail_req(Method, Path, Return) ->
     meck:expect(
         ibrowse,
         send_req_direct,
         fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
             Args = [W, Url, Headers, Meth, Body, Opts, TOut],
             #{path := UPath} = uri_string:parse(Url),
-            case lists:suffix(Path, UPath) of
-                true -> Return;
-                false -> meck:passthrough(Args)
+            case {lists:suffix(Path, UPath), Method == Meth} of
+                {true, true} ->
+                    _ = meck:passthrough(Args),
+                    Return;
+                {_, _} ->
+                    meck:passthrough(Args)
             end
         end
     ).
@@ -237,10 +350,18 @@ wait_rep_result(RepId) ->
     end.
 
 populate_db(DbName, Start, End) ->
+    populate_db(DbName, Start, End, false).
+
+populate_db(DbName, Start, End, WithAttachments) ->
     Docs = lists:foldl(
         fun(DocIdCounter, Acc) ->
             Id = integer_to_binary(DocIdCounter),
-            Doc = #doc{id = Id, body = {[]}},
+            Atts =
+                case WithAttachments of
+                    true -> [att(<<"att1">>, 1024, <<"app/binary">>)];
+                    false -> []
+                end,
+            Doc = #doc{id = Id, body = {[]}, atts = Atts},
             [Doc | Acc]
         end,
         [],
@@ -248,6 +369,14 @@ populate_db(DbName, Start, End) ->
     ),
     {ok, [_ | _]} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
 
+att(Name, Size, Type) ->
+    couch_att:new([
+        {name, Name},
+        {type, Type},
+        {att_len, Size},
+        {data, fun(Count) -> crypto:strong_rand_bytes(Count) end}
+    ]).
+
 wait_target_in_sync(Source, Target) ->
     {ok, SourceDocCount} = fabric:get_doc_count(Source),
     wait_target_in_sync_loop(SourceDocCount, Target, 300).
@@ -271,17 +400,20 @@ wait_target_in_sync_loop(DocCount, TargetName, 
RetriesLeft) ->
     end.
 
 replicate(Source, Target) ->
-    Rep = make_rep(Source, Target),
+    replicate(Source, Target, true).
+
+replicate(Source, Target, Continuous) ->
+    Rep = make_rep(Source, Target, Continuous),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
 
-make_rep(Source, Target) ->
+make_rep(Source, Target, Continuous) ->
     RepObject =
         {[
             {<<"source">>, url(Source)},
             {<<"target">>, url(Target)},
-            {<<"continuous">>, true},
+            {<<"continuous">>, Continuous},
             {<<"worker_processes">>, 1},
             {<<"retries_per_request">>, 1},
             % Low connection timeout so _changes feed gets restarted quicker

Reply via email to