Repository: couchdb-couch-replicator Updated Branches: refs/heads/1843-feature-bigcouch a8add8b92 -> 2db29cc1f (forced update)
Handle open_revs retries at a higher level This patch disables the httpc client retries for the request to stream document revisions to the replicator. The retry logic at that level could end up jumbling together response body data from different requests and thoroughly confusing the multipart parser. Moving the retry logic up a level allows us to start fresh each time. BugzID: 21367 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/3597d756 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/3597d756 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/3597d756 Branch: refs/heads/1843-feature-bigcouch Commit: 3597d756546d522f0496e745b10c8cdf43110262 Parents: 7cd0c4f Author: Adam Kocoloski <[email protected]> Authored: Tue Sep 10 22:26:14 2013 -0400 Committer: Robert Newson <[email protected]> Committed: Mon Apr 28 12:03:16 2014 +0100 ---------------------------------------------------------------------- src/couch_replicator_api_wrap.erl | 98 +++++++++++++++++++++++----------- 1 file changed, 67 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3597d756/src/couch_replicator_api_wrap.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl index d072187..2882567 100644 --- a/src/couch_replicator_api_wrap.erl +++ b/src/couch_replicator_api_wrap.erl @@ -48,6 +48,7 @@ get_value/3 ]). +-define(MAX_WAIT, 5 * 60 * 1000). db_uri(#httpdb{url = Url}) -> couch_util:url_strip_password(Url); @@ -157,43 +158,78 @@ get_missing_revs(Db, IdRevs) -> +open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) -> + Path = encode_doc_id(Id), + QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), + Url = couch_util:url_strip_password( + couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}]) + ), + ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]), + exit(kaboom); open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) -> Path = encode_doc_id(Id), - QArgs = options_to_query_args( - HttpDb, Path, [revs, {open_revs, Revs} | Options]), - Self = self(), - Streamer = spawn_link(fun() -> - send_req( - HttpDb, - [{path, Path}, {qs, QArgs}, - {ibrowse_options, [{stream_to, {self(), once}}]}, - {headers, [{"Accept", "multipart/mixed"}]}], - fun(200, Headers, StreamDataFun) -> - remote_open_doc_revs_streamer_start(Self), - {<<"--">>, _, _} = couch_httpd:parse_multipart_request( - get_value("Content-Type", Headers), - StreamDataFun, - fun mp_parse_mixed/1) - end), - unlink(Self) + QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]), + {Pid, Ref} = spawn_monitor(fun() -> + Self = self(), + Callback = fun(200, Headers, StreamDataFun) -> + remote_open_doc_revs_streamer_start(Self), + {<<"--">>, _, _} = couch_httpd:parse_multipart_request( + get_value("Content-Type", Headers), + StreamDataFun, + fun mp_parse_mixed/1 + ) + end, + Streamer = spawn_link(fun() -> + Params = [ + {path, Path}, + {qs, QS}, + {ibrowse_options, [{stream_to, {self(), once}}]}, + {headers, [{"Accept", "multipart/mixed"}]} + ], + % We're setting retries to 0 here to avoid the case where the + % Streamer retries the request and ends up jumbling together two + % different response bodies. Retries are handled explicitly by + % open_doc_revs itself. + send_req(HttpDb#httpdb{retries = 0}, Params, Callback) + end), + % If this process dies normally we can leave + % the Streamer process hanging around keeping an + % HTTP connection open. This is a bit of a + % hammer approach to making sure it releases + % that connection back to the pool. + spawn(fun() -> + Ref = erlang:monitor(process, Self), + receive + {'DOWN', Ref, process, Self, normal} -> + exit(Streamer, {streamer_parent_died, Self}); + {'DOWN', Ref, process, Self, _} -> + ok + end end), - % If this process dies normally we can leave - % the Streamer process hanging around keeping an - % HTTP connection open. This is a bit of a - % hammer approach to making sure it releases - % that connection back to the pool. - spawn(fun() -> - Ref = erlang:monitor(process, Self), receive - {'DOWN', Ref, process, Self, normal} -> - exit(Streamer, {streamer_parent_died, Self}); - {'DOWN', Ref, process, Self, _} -> - ok - end + {started_open_doc_revs, Ref} -> + Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc), + exit({exit_ok, Ret}) + end end), receive - {started_open_doc_revs, Ref} -> - receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) + {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> + Ret; + {'DOWN', Ref, process, Pid, Else} -> + Url = couch_util:url_strip_password( + couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}]) + ), + #httpdb{retries = Retries, wait = Wait0} = HttpDb, + Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT), + couch_log:notice("Retrying GET to ~s in ~p seconds due to error ~s", + [Url, Wait / 1000, Else] + ), + ok = timer:sleep(Wait), + RetryDb = HttpDb#httpdb{ + retries = Retries - 1, + wait = Wait + }, + open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc) end; open_doc_revs(Db, Id, Revs, Options, Fun, Acc) -> {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
