This is an automated email from the ASF dual-hosted git repository.

ronny pushed a commit to branch nouveau4win
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1e640aa639b03527cfd30588de01ab4fa7d55fc4
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Thu Jul 13 14:02:55 2023 -0400

    Crash replication jobs on unexpected 4xx errors
    
    Documents with attachments are written as individual HTTP PUT multipart
    requests. If a document PUT request fails with a 401, 403 or 413 error, the
    expectation is that the replicator would skip over that document, increment 
the
    `doc_write_failures` metric and continue replicating. That behavior is by
    design. However, previously, before this commit, documents were also 
skipped in
    case of any unknown 4xx error. This commit switches the default behavior 
such
    that unexpected 4xx errors do not continue. If the error is intermittent, 
the
    job will crash, retry, and would eventually succeed. If the error is
    persistent, exponential back-off will keep it from retrying indefinitely in 
a
    tight loop.
    
    Documents will be skipped only for 401/403/413/415 HTTP errors and for 400
    error code a reason related to attachment name validation failures.
    
    The second improvement in this commit is that http errors and error reasons
    bubble up though the API and are emitted as error logs. This should help 
users
    diagnose any issue quicker.
    
    Issue: https://github.com/apache/couchdb/issues/4676
---
 .../src/couch_replicator_api_wrap.erl              |  30 ++-
 .../src/couch_replicator_worker.erl                | 104 +++++++----
 .../couch_replicator_error_reporting_tests.erl     | 204 +++++++++++++++++++--
 3 files changed, 280 insertions(+), 58 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl 
b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index a44a79da1..fbe9538ad 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -343,6 +343,8 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, 
Acc) ->
             throw(Stub);
         {'DOWN', Ref, process, Pid, {http_request_failed, _, _, max_backoff}} 
->
             exit(max_backoff);
+        {'DOWN', Ref, process, Pid, {doc_write_failed, _} = Error} ->
+            exit(Error);
         {'DOWN', Ref, process, Pid, request_uri_too_long} ->
             NewMaxLen = get_value(max_url_len, Options, ?MAX_URL_LEN) div 2,
             case NewMaxLen < ?MIN_URL_LEN of
@@ -451,17 +453,25 @@ update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, 
Options, Type) ->
             (409, _, _) ->
                 throw(conflict);
             (Code, _, {Props}) ->
-                case {Code, get_value(<<"error">>, Props)} of
-                    {401, <<"unauthorized">>} ->
-                        throw({unauthorized, get_value(<<"reason">>, Props)});
-                    {403, <<"forbidden">>} ->
-                        throw({forbidden, get_value(<<"reason">>, Props)});
-                    {412, <<"missing_stub">>} ->
-                        throw({missing_stub, get_value(<<"reason">>, Props)});
-                    {413, _} ->
+                Error = get_value(<<"error">>, Props),
+                Reason = get_value(<<"reason">>, Props),
+                case {Code, Error, Reason} of
+                    {401, <<"unauthorized">>, _} ->
+                        throw({unauthorized, Reason});
+                    {403, <<"forbidden">>, _} ->
+                        throw({forbidden, Reason});
+                    {412, <<"missing_stub">>, _} ->
+                        throw({missing_stub, Reason});
+                    {413, _, _} ->
                         {error, request_body_too_large};
-                    {_, Error} ->
-                        {error, Error}
+                    {415, _, _} ->
+                        {error, unsupported_media_type};
+                    {400, <<"bad_request">>, <<"Attachment name ", _/binary>>} 
->
+                        {error, {invalid_attachment_name, Reason}};
+                    {_, undefined, _} ->
+                        {error, {Code, Props}};
+                    {_, _, _} ->
+                        {error, {Error, Reason}}
                 end
         end
     ).
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl 
b/src/couch_replicator/src/couch_replicator_worker.erl
index 46e4a6e94..478ad03d8 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -238,6 +238,8 @@ handle_info({'EXIT', _Pid, {revs_diff_failed, _, _} = Err}, 
State) ->
     {stop, {shutdown, Err}, State};
 handle_info({'EXIT', _Pid, {http_request_failed, _, _, _} = Err}, State) ->
     {stop, {shutdown, Err}, State};
+handle_info({'EXIT', _Pid, {doc_write_failed, _} = Err}, State) ->
+    {stop, {shutdown, Err}, State};
 handle_info({'EXIT', Pid, Reason}, State) ->
     {stop, {process_died, Pid, Reason}, State}.
 
@@ -583,44 +585,78 @@ handle_flush_docs_result({error, {bulk_docs_failed, _, _} 
= Err}, _, _) ->
 
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_replicator_api_wrap:update_doc(Target, Doc, [], 
?REPLICATED_CHANGES) of
-        {ok, _} ->
-            ok;
-        Error ->
-            couch_log:error(
-                "Replicator: error writing document `~s` to `~s`: ~s",
-                [Id, couch_replicator_api_wrap:db_uri(Target), 
couch_util:to_binary(Error)]
-            ),
-            Error
+        {ok, _} -> ok;
+        Error -> handle_doc_write_error(Error, Target, Id, Pos, RevId)
     catch
-        throw:{missing_stub, _} = MissingStub ->
-            throw(MissingStub);
         throw:{Error, Reason} ->
-            couch_log:error(
-                "Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`, reason: `~s`.",
-                [
-                    Id,
-                    couch_doc:rev_to_str({Pos, RevId}),
-                    couch_replicator_api_wrap:db_uri(Target),
-                    to_binary(Error),
-                    to_binary(Reason)
-                ]
-            ),
-            {error, Error};
-        throw:Err ->
-            couch_log:error(
-                "Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`.",
-                [
-                    Id,
-                    couch_doc:rev_to_str({Pos, RevId}),
-                    couch_replicator_api_wrap:db_uri(Target),
-                    to_binary(Err)
-                ]
-            ),
-            {error, Err}
+            handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId)
     end.
 
+% In most cases we fail the replication job by re-throwing the error.
+% The only exceptions are expected validation and VDU failures
+%  401 : unauthorized
+%  403 : forbidden
+%  413 : request_body_too_large
+%  415 : unsupported_media_type
+%  400 : bad_request where reason starts with "Attachment name "
+%
+handle_doc_write_error({missing_stub, _} = MissingStub, _, _, _, _) ->
+    throw(MissingStub);
+handle_doc_write_error({error, Reason} = Error, Target, Id, _, _) when
+    Reason == request_body_too_large orelse Reason == unsupported_media_type
+->
+    couch_log:error(
+        "Replicator: skipping writing document `~s` to `~s` : ~s.",
+        [Id, couch_replicator_api_wrap:db_uri(Target), Reason]
+    ),
+    Error;
+handle_doc_write_error({error, {invalid_attachment_name, Reason}} = Error, 
Target, Id, _, _) ->
+    couch_log:error(
+        "Replicator: skipping writing document `~s` to `~s` : 
invalid_attachment_name ~s",
+        [Id, couch_replicator_api_wrap:db_uri(Target), Reason]
+    ),
+    Error;
+handle_doc_write_error({Error, Reason}, Target, Id, Pos, RevId) when
+    Error == unauthorized orelse Error == forbidden
+->
+    couch_log:error(
+        "Replicator: skipping writing document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`, reason: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error),
+            to_binary(Reason)
+        ]
+    ),
+    {error, Error};
+handle_doc_write_error({error, {Error, Reason}}, Target, Id, Pos, RevId) ->
+    couch_log:error(
+        "Replicator: error writing document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`, reason: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error),
+            to_binary(Reason)
+        ]
+    ),
+    exit({doc_write_failed, {Error, Reason}});
+handle_doc_write_error(Error, Target, Id, Pos, RevId) ->
+    couch_log:error(
+        "Replicator: error writing document `~s`, revision `~s`,"
+        " to target database `~s`. Error: `~s`.",
+        [
+            Id,
+            couch_doc:rev_to_str({Pos, RevId}),
+            couch_replicator_api_wrap:db_uri(Target),
+            to_binary(Error)
+        ]
+    ),
+    exit({doc_write_failed, Error}).
+
 find_missing(DocInfos, Target, Parent, #fetch_stats{} = St) ->
     {IdRevs, AllCount} = lists:foldr(
         fun
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
index d9c6a1048..30bc12c29 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_error_reporting_tests.erl
@@ -24,6 +24,14 @@ error_reporting_test_() ->
         [
             ?TDEF_FE(t_fail_bulk_docs),
             ?TDEF_FE(t_fail_changes_reader),
+            ?TDEF_FE(t_fail_doc_put_4xx_well_formed_json_error),
+            ?TDEF_FE(t_fail_doc_put_4xx_unexpected_json_error),
+            ?TDEF_FE(t_fail_doc_put_4xx_invalid_json_error),
+            ?TDEF_FE(t_skip_doc_put_401_errors),
+            ?TDEF_FE(t_skip_doc_put_403_errors),
+            ?TDEF_FE(t_skip_doc_put_413_errors),
+            ?TDEF_FE(t_skip_doc_put_415_errors),
+            ?TDEF_FE(t_skip_doc_put_invalid_attachment_name),
             ?TDEF_FE(t_fail_revs_diff),
             ?TDEF_FE(t_fail_bulk_get, 15),
             ?TDEF_FE(t_fail_changes_queue),
@@ -41,7 +49,7 @@ t_fail_bulk_docs({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_bulk_docs", {ok, "403", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -55,7 +63,7 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(get, "/_changes", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -63,13 +71,159 @@ t_fail_changes_reader({_Ctx, {Source, Target}}) ->
 
     couch_replicator_notifier:stop(Listener).
 
+t_fail_doc_put_4xx_well_formed_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    ErrBody = [<<"{\"error\":\"x\", \"reason\":\"y\"}">>],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertEqual({doc_write_failed, {<<"x">>, <<"y">>}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_fail_doc_put_4xx_unexpected_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    ErrBody = [<<"{\"a\":\"b\"}">>],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertEqual({doc_write_failed, {400, [{<<"a">>, <<"b">>}]}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_fail_doc_put_4xx_invalid_json_error({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    {ok, RepId} = replicate(Source, Target),
+    wait_target_in_sync(Source, Target),
+
+    {ok, Listener} = rep_result_listener(RepId),
+    mock_fail_req(put, "/6", {ok, "400", [], [<<"potato">>]}),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+
+    {error, Result} = wait_rep_result(RepId),
+    ?assertMatch({doc_write_failed, {invalid_json, _}}, Result),
+
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_401_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"unauthorized\", \"reason\":\"vdu\"}">>],
+    mock_fail_req(put, "/6", {ok, "401", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_403_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"forbidden\", \"reason\":\"vdu\"}">>],
+    mock_fail_req(put, "/6", {ok, "403", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_413_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"too_large\", \"reason\":\"too_large\"}">>],
+    mock_fail_req(put, "/6", {ok, "413", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_415_errors({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [<<"{\"error\":\"unsupported_media_type\", 
\"reason\":\"bad_media\"}">>],
+    mock_fail_req(put, "/6", {ok, "415", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
+t_skip_doc_put_invalid_attachment_name({_Ctx, {Source, Target}}) ->
+    populate_db(Source, 1, 5),
+    populate_db(Source, 6, 6, _WithAttachments = true),
+    ErrBody = [
+        <<"{\"error\":\"bad_request\", \"reason\":\"Attachment name '_foo' 
starts with prohibited character '_'\"}">>
+    ],
+    mock_fail_req(put, "/6", {ok, "400", [], ErrBody}),
+    {ok, RepId} = replicate(Source, Target, false),
+    {ok, Listener} = rep_result_listener(RepId),
+    Res = wait_rep_result(RepId),
+    % Replication job should succeed
+    ?assertMatch({ok, {[_ | _]}}, Res),
+    {ok, {Props}} = Res,
+    History = proplists:get_value(<<"history">>, Props),
+    ?assertMatch([{[_ | _]}], History),
+    [{HistProps}] = History,
+    DocsWritten = proplists:get_value(<<"docs_written">>, HistProps),
+    DocWriteFailures = proplists:get_value(<<"doc_write_failures">>, 
HistProps),
+    ?assertEqual(5, DocsWritten),
+    ?assertEqual(1, DocWriteFailures),
+    couch_replicator_notifier:stop(Listener).
+
 t_fail_revs_diff({_Ctx, {Source, Target}}) ->
     populate_db(Source, 1, 5),
     {ok, RepId} = replicate(Source, Target),
     wait_target_in_sync(Source, Target),
 
     {ok, Listener} = rep_result_listener(RepId),
-    mock_fail_req("/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_revs_diff", {ok, "407", [], [<<"{\"x\":\"y\"}">>]}),
     populate_db(Source, 6, 6),
 
     {error, Result} = wait_rep_result(RepId),
@@ -87,7 +241,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     wait_target_in_sync(Source, Target),
 
     % Tolerate a 500 error
-    mock_fail_req("/_bulk_get", {ok, "501", [], [<<"not_implemented">>]}),
+    mock_fail_req(post, "/_bulk_get", {ok, "501", [], 
[<<"not_implemented">>]}),
     meck:reset(couch_replicator_api_wrap),
     populate_db(Source, 6, 6),
     wait_target_in_sync(Source, Target),
@@ -95,7 +249,7 @@ t_fail_bulk_get({_Ctx, {Source, Target}}) ->
     ?assertEqual(1, meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 
6)),
 
     % Tolerate a 400 error
-    mock_fail_req("/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
+    mock_fail_req(post, "/_bulk_get", {ok, "418", [], [<<"{\"x\":\"y\"}">>]}),
     meck:reset(couch_replicator_api_wrap),
     populate_db(Source, 7, 7),
     wait_target_in_sync(Source, Target),
@@ -157,7 +311,7 @@ t_dont_start_duplicate_job({_Ctx, {Source, Target}}) ->
     meck:new(couch_replicator_pg, [passthrough]),
     Pid = pid_from_another_node(),
     meck:expect(couch_replicator_pg, should_start, fun(_, _) -> {no, Pid} end),
-    Rep = make_rep(Source, Target),
+    Rep = make_rep(Source, Target, true),
     ExpectErr = {error, {already_started, Pid}},
     ?assertEqual(ExpectErr, couch_replicator_scheduler_job:start_link(Rep)).
 
@@ -205,16 +359,19 @@ pid_from_another_node() ->
     ?assertEqual('A@1', node(Pid)),
     Pid.
 
-mock_fail_req(Path, Return) ->
+mock_fail_req(Method, Path, Return) ->
     meck:expect(
         ibrowse,
         send_req_direct,
         fun(W, Url, Headers, Meth, Body, Opts, TOut) ->
             Args = [W, Url, Headers, Meth, Body, Opts, TOut],
             #{path := UPath} = uri_string:parse(Url),
-            case lists:suffix(Path, UPath) of
-                true -> Return;
-                false -> meck:passthrough(Args)
+            case {lists:suffix(Path, UPath), Method == Meth} of
+                {true, true} ->
+                    _ = meck:passthrough(Args),
+                    Return;
+                {_, _} ->
+                    meck:passthrough(Args)
             end
         end
     ).
@@ -237,10 +394,18 @@ wait_rep_result(RepId) ->
     end.
 
 populate_db(DbName, Start, End) ->
+    populate_db(DbName, Start, End, false).
+
+populate_db(DbName, Start, End, WithAttachments) ->
     Docs = lists:foldl(
         fun(DocIdCounter, Acc) ->
             Id = integer_to_binary(DocIdCounter),
-            Doc = #doc{id = Id, body = {[]}},
+            Atts =
+                case WithAttachments of
+                    true -> [att(<<"att1">>, 1024, <<"app/binary">>)];
+                    false -> []
+                end,
+            Doc = #doc{id = Id, body = {[]}, atts = Atts},
             [Doc | Acc]
         end,
         [],
@@ -248,6 +413,14 @@ populate_db(DbName, Start, End) ->
     ),
     {ok, [_ | _]} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
 
+att(Name, Size, Type) ->
+    couch_att:new([
+        {name, Name},
+        {type, Type},
+        {att_len, Size},
+        {data, fun(Count) -> crypto:strong_rand_bytes(Count) end}
+    ]).
+
 wait_target_in_sync(Source, Target) ->
     {ok, SourceDocCount} = fabric:get_doc_count(Source),
     wait_target_in_sync_loop(SourceDocCount, Target, 300).
@@ -271,17 +444,20 @@ wait_target_in_sync_loop(DocCount, TargetName, 
RetriesLeft) ->
     end.
 
 replicate(Source, Target) ->
-    Rep = make_rep(Source, Target),
+    replicate(Source, Target, true).
+
+replicate(Source, Target, Continuous) ->
+    Rep = make_rep(Source, Target, Continuous),
     ok = couch_replicator_scheduler:add_job(Rep),
     couch_replicator_scheduler:reschedule(),
     {ok, Rep#rep.id}.
 
-make_rep(Source, Target) ->
+make_rep(Source, Target, Continuous) ->
     RepObject =
         {[
             {<<"source">>, url(Source)},
             {<<"target">>, url(Target)},
-            {<<"continuous">>, true},
+            {<<"continuous">>, Continuous},
             {<<"worker_processes">>, 1},
             {<<"retries_per_request">>, 1},
             % Low connection timeout so _changes feed gets restarted quicker

Reply via email to