This is an automated email from the ASF dual-hosted git repository. ronny pushed a commit to branch nouveau4win in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1e640aa639b03527cfd30588de01ab4fa7d55fc4 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Thu Jul 13 14:02:55 2023 -0400 Crash replication jobs on unexpected 4xx errors Documents with attachments are written as individual HTTP PUT multipart requests. If a document PUT request fails with a 401, 403 or 413 error, the expectation is that the replicator would skip over that document, increment the `doc_write_failures` metric and continue replicating. That behavior is by design. However, previously, before this commit, documents were also skipped in case of any unknown 4xx error. This commit switches the default behavior such that unexpected 4xx errors do not continue. If the error is intermittent, the job will crash, retry, and would eventually succeed. If the error is persistent, exponential back-off will keep it from retrying indefinitely in a tight loop. Documents will be skipped only for 401/403/413/415 HTTP errors and for 400 error code a reason related to attachment name validation failures. The second improvement in this commit is that http errors and error reasons bubble up though the API and are emitted as error logs. This should help users diagnose any issue quicker. Issue: https://github.com/apache/couchdb/issues/4676 --- .../src/couch_replicator_api_wrap.erl | 30 ++- .../src/couch_replicator_worker.erl | 104 +++++++---- .../couch_replicator_error_reporting_tests.erl | 204 +++++++++++++++++++-- 3 files changed, 280 insertions(+), 58 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl index a44a79da1..fbe9538ad 100644 --- a/src/couch_replicator/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl @@ -343,6 +343,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> throw(Stub); {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} -> exit(max_backoff); + {'DOWN', Ref, process, Pid, {doc_write_failed, _} = Error} -> + exit(Error); {'DOWN', Ref, process, Pid, request_uri_too_long} -> NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2, case NewMaxLen < ?MIN_URL_LEN of @@ -451,17 +453,25 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) -> (409, _, _) -> throw(conflict); (Code, _, {Props}) -> - case {Code, get_value(<<"error">>, Props)} of - {401, <<"unauthorized">>} -> - throw({unauthorized, get_value(<<"reason">>, Props)}); - {403, <<"forbidden">>} -> - throw({forbidden, get_value(<<"reason">>, Props)}); - {412, <<"missing_stub">>} -> - throw({missing_stub, get_value(<<"reason">>, Props)}); - {413, _} -> + Error = get_value(<<"error">>, Props), + Reason = get_value(<<"reason">>, Props), + case {Code, Error, Reason} of + {401, <<"unauthorized">>, _} -> + throw({unauthorized, Reason}); + {403, <<"forbidden">>, _} -> + throw({forbidden, Reason}); + {412, <<"missing_stub">>, _} -> + throw({missing_stub, Reason}); + {413, _, _} -> {error, request_body_too_large}; - {_, Error} -> - {error, Error} + {415, _, _} -> + {error, unsupported_media_type}; + {400, <<"bad_request">>, <<"Attachment name ", _/binary>>} -> + {error, {invalid_attachment_name, Reason}}; + {_, undefined, _} -> + {error, {Code, Props}}; + {_, _, _} -> + {error, {Error, Reason}} end end ). diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl index 46e4a6e94..478ad03d8 100644 --- a/src/couch_replicator/src/couch_replicator_worker.erl +++ b/src/couch_replicator/src/couch_replicator_worker.erl @@ -238,6 +238,8 @@ handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, State) -> {stop, {shutdown, Err}, State}; handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) -> {stop, {shutdown, Err}, State}; +handle_info({'EXIT', _Pid, {doc_write_failed, _} = Err}, State) -> + {stop, {shutdown, Err}, State}; handle_info({'EXIT', Pid, Reason}, State) -> {stop, {process_died, Pid, Reason}, State}. @@ -583,44 +585,78 @@ handle_flush_docs_result({error, {bulk_docs_failed, _, _} = Err}, _, _) -> flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) -> try couch_replicator_api_wrap:update_doc(Target, Doc, [], ?REPLICATED_CHANGES) of - {ok, _} -> - ok; - Error -> - couch_log:error( - "Replicator: error writing document `~s` to `~s`: ~s", - [Id, couch_replicator_api_wrap:db_uri(Target), couch_util:to_binary(Error)] - ), - Error + {ok, _} -> ok; + Error -> handle_doc_write_error(Error, Target, Id, Pos, RevId) catch - throw:{missing_stub, _} = MissingStub -> - throw(MissingStub); throw:{Error, Reason} -> - couch_log:error( - "Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`, reason: `~s`.", - [ - Id, - couch_doc:rev_to_str({Pos, RevId}), - couch_replicator_api_wrap:db_uri(Target), - to_binary(Error), - to_binary(Reason) - ] - ), - {error, Error}; - throw:Err -> - couch_log:error( - "Replicator: couldn't write document `~s`, revision `~s`," - " to target database `~s`. Error: `~s`.", - [ - Id, - couch_doc:rev_to_str({Pos, RevId}), - couch_replicator_api_wrap:db_uri(Target), - to_binary(Err) - ] - ), - {error, Err} + handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) end. +% In most cases we fail the replication job by re-throwing the error. +% The only exceptions are expected validation and VDU failures +% 401 : unauthorized +% 403 : forbidden +% 413 : request_body_too_large +% 415 : unsupported_media_type +% 400 : bad_request where reason starts with "Attachment name " +% +handle_doc_write_error({missing_stub, _} = MissingStub, _, _, _, _) -> + throw(MissingStub); +handle_doc_write_error({error, Reason} = Error, Target, Id, _, _) when + Reason == request_body_too_large orelse Reason == unsupported_media_type +-> + couch_log:error( + "Replicator: skipping writing document `~s` to `~s` : ~s.", + [Id, couch_replicator_api_wrap:db_uri(Target), Reason] + ), + Error; +handle_doc_write_error({error, {invalid_attachment_name, Reason}} = Error, Target, Id, _, _) -> + couch_log:error( + "Replicator: skipping writing document `~s` to `~s` : invalid_attachment_name ~s", + [Id, couch_replicator_api_wrap:db_uri(Target), Reason] + ), + Error; +handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) when + Error == unauthorized orelse Error == forbidden +-> + couch_log:error( + "Replicator: skipping writing document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~s`.", + [ + Id, + couch_doc:rev_to_str({Pos, RevId}), + couch_replicator_api_wrap:db_uri(Target), + to_binary(Error), + to_binary(Reason) + ] + ), + {error, Error}; +handle_doc_write_error({error, {Error, Reason}}, Target, Id, Pos, RevId) -> + couch_log:error( + "Replicator: error writing document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`, reason: `~s`.", + [ + Id, + couch_doc:rev_to_str({Pos, RevId}), + couch_replicator_api_wrap:db_uri(Target), + to_binary(Error), + to_binary(Reason) + ] + ), + exit({doc_write_failed, {Error, Reason}}); +handle_doc_write_error(Error, Target, Id, Pos, RevId) -> + couch_log:error( + "Replicator: error writing document `~s`, revision `~s`," + " to target database `~s`. Error: `~s`.", + [ + Id, + couch_doc:rev_to_str({Pos, RevId}), + couch_replicator_api_wrap:db_uri(Target), + to_binary(Error) + ] + ), + exit({doc_write_failed, Error}). + find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) -> {IdRevs, AllCount} = lists:foldr( fun diff --git a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl index d9c6a1048..30bc12c29 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl @@ -24,6 +24,14 @@ error_reporting_test_() -> [ ?TDEF_FE(t_fail_bulk_docs), ?TDEF_FE(t_fail_changes_reader), + ?TDEF_FE(t_fail_doc_put_4xx_well_formed_json_error), + ?TDEF_FE(t_fail_doc_put_4xx_unexpected_json_error), + ?TDEF_FE(t_fail_doc_put_4xx_invalid_json_error), + ?TDEF_FE(t_skip_doc_put_401_errors), + ?TDEF_FE(t_skip_doc_put_403_errors), + ?TDEF_FE(t_skip_doc_put_413_errors), + ?TDEF_FE(t_skip_doc_put_415_errors), + ?TDEF_FE(t_skip_doc_put_invalid_attachment_name), ?TDEF_FE(t_fail_revs_diff), ?TDEF_FE(t_fail_bulk_get, 15), ?TDEF_FE(t_fail_changes_queue), @@ -41,7 +49,7 @@ t_fail_bulk_docs({_Ctx, {Source, Target}}) -> wait_target_in_sync(Source, Target), {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(post, "/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}), populate_db(Source, 6, 6), {error, Result} = wait_rep_result(RepId), @@ -55,7 +63,7 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) -> wait_target_in_sync(Source, Target), {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(get, "/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), populate_db(Source, 6, 6), {error, Result} = wait_rep_result(RepId), @@ -63,13 +71,159 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) -> couch_replicator_notifier:stop(Listener). +t_fail_doc_put_4xx_well_formed_json_error({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + ErrBody = [<<"{\"error\":\"x\", \"reason\":\"y\"}">>], + mock_fail_req(put, "/6", {ok, "400", [], ErrBody}), + populate_db(Source, 6, 6, _WithAttachments = true), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({doc_write_failed, {<<"x">>, <<"y">>}}, Result), + + couch_replicator_notifier:stop(Listener). + +t_fail_doc_put_4xx_unexpected_json_error({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + ErrBody = [<<"{\"a\":\"b\"}">>], + mock_fail_req(put, "/6", {ok, "400", [], ErrBody}), + populate_db(Source, 6, 6, _WithAttachments = true), + + {error, Result} = wait_rep_result(RepId), + ?assertEqual({doc_write_failed, {400, [{<<"a">>, <<"b">>}]}}, Result), + + couch_replicator_notifier:stop(Listener). + +t_fail_doc_put_4xx_invalid_json_error({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + {ok, RepId} = replicate(Source, Target), + wait_target_in_sync(Source, Target), + + {ok, Listener} = rep_result_listener(RepId), + mock_fail_req(put, "/6", {ok, "400", [], [<<"potato">>]}), + populate_db(Source, 6, 6, _WithAttachments = true), + + {error, Result} = wait_rep_result(RepId), + ?assertMatch({doc_write_failed, {invalid_json, _}}, Result), + + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>], + mock_fail_req(put, "/6", {ok, "401", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>], + mock_fail_req(put, "/6", {ok, "403", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>], + mock_fail_req(put, "/6", {ok, "413", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_415_errors({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [<<"{\"error\":\"unsupported_media_type\", \"reason\":\"bad_media\"}">>], + mock_fail_req(put, "/6", {ok, "415", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + +t_skip_doc_put_invalid_attachment_name({_Ctx, {Source, Target}}) -> + populate_db(Source, 1, 5), + populate_db(Source, 6, 6, _WithAttachments = true), + ErrBody = [ + <<"{\"error\":\"bad_request\", \"reason\":\"Attachment name '_foo' starts with prohibited character '_'\"}">> + ], + mock_fail_req(put, "/6", {ok, "400", [], ErrBody}), + {ok, RepId} = replicate(Source, Target, false), + {ok, Listener} = rep_result_listener(RepId), + Res = wait_rep_result(RepId), + % Replication job should succeed + ?assertMatch({ok, {[_ | _]}}, Res), + {ok, {Props}} = Res, + History = proplists:get_value(<<"history">>, Props), + ?assertMatch([{[_ | _]}], History), + [{HistProps}] = History, + DocsWritten = proplists:get_value(<<"docs_written">>, HistProps), + DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, HistProps), + ?assertEqual(5, DocsWritten), + ?assertEqual(1, DocWriteFailures), + couch_replicator_notifier:stop(Listener). + t_fail_revs_diff({_Ctx, {Source, Target}}) -> populate_db(Source, 1, 5), {ok, RepId} = replicate(Source, Target), wait_target_in_sync(Source, Target), {ok, Listener} = rep_result_listener(RepId), - mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(post, "/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}), populate_db(Source, 6, 6), {error, Result} = wait_rep_result(RepId), @@ -87,7 +241,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) -> wait_target_in_sync(Source, Target), % Tolerate a 500 error - mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}), + mock_fail_req(post, "/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}), meck:reset(couch_replicator_api_wrap), populate_db(Source, 6, 6), wait_target_in_sync(Source, Target), @@ -95,7 +249,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) -> ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6)), % Tolerate a 400 error - mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), + mock_fail_req(post, "/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}), meck:reset(couch_replicator_api_wrap), populate_db(Source, 7, 7), wait_target_in_sync(Source, Target), @@ -157,7 +311,7 @@ t_dont_start_duplicate_job({_Ctx, {Source, Target}}) -> meck:new(couch_replicator_pg, [passthrough]), Pid = pid_from_another_node(), meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end), - Rep = make_rep(Source, Target), + Rep = make_rep(Source, Target, true), ExpectErr = {error, {already_started, Pid}}, ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)). @@ -205,16 +359,19 @@ pid_from_another_node() -> ?assertEqual('A@1', node(Pid)), Pid. -mock_fail_req(Path, Return) -> +mock_fail_req(Method, Path, Return) -> meck:expect( ibrowse, send_req_direct, fun(W, Url, Headers, Meth, Body, Opts, TOut) -> Args = [W, Url, Headers, Meth, Body, Opts, TOut], #{path := UPath} = uri_string:parse(Url), - case lists:suffix(Path, UPath) of - true -> Return; - false -> meck:passthrough(Args) + case {lists:suffix(Path, UPath), Method == Meth} of + {true, true} -> + _ = meck:passthrough(Args), + Return; + {_, _} -> + meck:passthrough(Args) end end ). @@ -237,10 +394,18 @@ wait_rep_result(RepId) -> end. populate_db(DbName, Start, End) -> + populate_db(DbName, Start, End, false). + +populate_db(DbName, Start, End, WithAttachments) -> Docs = lists:foldl( fun(DocIdCounter, Acc) -> Id = integer_to_binary(DocIdCounter), - Doc = #doc{id = Id, body = {[]}}, + Atts = + case WithAttachments of + true -> [att(<<"att1">>, 1024, <<"app/binary">>)]; + false -> [] + end, + Doc = #doc{id = Id, body = {[]}, atts = Atts}, [Doc | Acc] end, [], @@ -248,6 +413,14 @@ populate_db(DbName, Start, End) -> ), {ok, [_ | _]} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]). +att(Name, Size, Type) -> + couch_att:new([ + {name, Name}, + {type, Type}, + {att_len, Size}, + {data, fun(Count) -> crypto:strong_rand_bytes(Count) end} + ]). + wait_target_in_sync(Source, Target) -> {ok, SourceDocCount} = fabric:get_doc_count(Source), wait_target_in_sync_loop(SourceDocCount, Target, 300). @@ -271,17 +444,20 @@ wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> end. replicate(Source, Target) -> - Rep = make_rep(Source, Target), + replicate(Source, Target, true). + +replicate(Source, Target, Continuous) -> + Rep = make_rep(Source, Target, Continuous), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), {ok, Rep#rep.id}. -make_rep(Source, Target) -> +make_rep(Source, Target, Continuous) -> RepObject = {[ {<<"source">>, url(Source)}, {<<"target">>, url(Target)}, - {<<"continuous">>, true}, + {<<"continuous">>, Continuous}, {<<"worker_processes">>, 1}, {<<"retries_per_request">>, 1}, % Low connection timeout so _changes feed gets restarted quicker
