Repository: couchdb-couch-replicator Updated Branches: refs/heads/master 0c8b69d41 -> 227cbc647
Fix stuck changes reader in clean_mailbox Due to unfortunate timing issues it was possible for a changes reader to get stuck in clean_mailbox reading an entire changes feed before exiting. If the ibrowse call timed out right before ibrowse starts sending messages then we would see clean_mailbox loop until the changes feed terminated on the source. This caps the number of messagses that can be cleaned up to a maximum of sixteen. This limit is rather arbitrary. The cleanup was intended for when only a couple messages were lingering. This is much larger than that without being insanely large. BugzId: 49717 Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/227cbc64 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/227cbc64 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/227cbc64 Branch: refs/heads/master Commit: 227cbc6475db926ec9fb1b5cc93c149299484ce7 Parents: 0c8b69d Author: Paul J. Davis <[email protected]> Authored: Tue Jul 21 16:01:46 2015 -0500 Committer: Robert Newson <[email protected]> Committed: Mon Aug 24 16:39:28 2015 +0100 ---------------------------------------------------------------------- src/couch_replicator_httpc.erl | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/227cbc64/src/couch_replicator_httpc.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl index e591601..052eb98 100644 --- a/src/couch_replicator_httpc.erl +++ b/src/couch_replicator_httpc.erl @@ -30,6 +30,16 @@ -define(STREAM_STATUS, ibrowse_stream_status). +% This limit is for the number of messages we're willing to discard +% from an HTTP stream in clean_mailbox/1 before killing the worker +% and returning. The original intent for clean_mailbox was to remove +% a single message or two if the changes feed returned before fully +% consuming the request. This threshold gives us confidence we'll +% continue to properly close changes feeds while avoiding any case +% where we may end up processing an unbounded number of messages. +-define(MAX_DISCARDED_MESSAGES, 16). + + setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) -> {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, MaxConns}]), {ok, Db#httpdb{httpc_pool = Pid}}. @@ -169,13 +179,31 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> % on the ibrowse_req_id format. This just drops all % messages for the given ReqId on the floor since we're % no longer in the HTTP request. -clean_mailbox({ibrowse_req_id, ReqId}) -> + +clean_mailbox(ReqId) -> + clean_mailbox(ReqId, ?MAX_DISCARDED_MESSAGES). + + +clean_mailbox(_ReqId, 0) -> + case get(?STREAM_STATUS) of + {streaming, Worker} -> + % We kill workers that continue to stream us + % messages after we give up but do *not* exit + % our selves. This is because we may be running + % as an exception unwinds and we don't want to + % change any of that subtle logic. + exit(Worker, {timeout, ibrowse_stream_cleanup}); + _ -> + ok + end, + ok; +clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 -> case get(?STREAM_STATUS) of streaming -> ibrowse:stream_next(ReqId), receive {ibrowse_async_response, ReqId, _} -> - clean_mailbox({ibrowse_req_id, ReqId}); + clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); {ibrowse_async_response_end, ReqId} -> put(?STREAM_STATUS, ended), ok @@ -185,7 +213,7 @@ clean_mailbox({ibrowse_req_id, ReqId}) -> Status when Status == init; Status == ended -> receive {ibrowse_async_response, ReqId, _} -> - clean_mailbox({ibrowse_req_id, ReqId}); + clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); {ibrowse_async_response_end, ReqId} -> put(?STREAM_STATUS, ended), ok @@ -193,7 +221,7 @@ clean_mailbox({ibrowse_req_id, ReqId}) -> ok end end; -clean_mailbox(_) -> +clean_mailbox(_, Count) when Count > 0 -> ok.
