Updated Branches: refs/heads/master 88c52b232 -> 8ccf696f8
Consume all ibrowse messages before stream_next The flow control in ibrowse's async response streams is tricky. We call stream_next to pull more data off the socket, but it seems that ibrowse will sometimes split that data into multiple messages. If we call stream_next for each message we process we end up with an overflowing mailbox. This patch changes the consumer so that it clears out the mailbox before calling stream_next. Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/8ccf696f Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/8ccf696f Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/8ccf696f Branch: refs/heads/master Commit: 8ccf696f833a0f0a453d733807356501ae5b355e Parents: 88c52b2 Author: Adam Kocolosk <[email protected]> Authored: Wed Oct 31 07:35:08 2012 -0400 Committer: Bob Dionne <[email protected]> Committed: Wed Oct 31 07:35:08 2012 -0400 ---------------------------------------------------------------------- .../src/couch_replicator_httpc.erl | 25 ++++++++++---- 1 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb/blob/8ccf696f/src/couch_replicator/src/couch_replicator_httpc.erl ---------------------------------------------------------------------- diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index 6804448..8773383 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -185,22 +185,33 @@ error_cause(Cause) -> stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) -> + case accumulate_messages(ReqId, [], T + 500) of + {Data, ibrowse_async_response} -> + ibrowse:stream_next(ReqId), + {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end}; + {Data, ibrowse_async_response_end} -> + {Data, fun() -> throw({maybe_retry_req, more_data_expected}) end} + end. + +accumulate_messages(ReqId, Acc, Timeout) -> receive {ibrowse_async_response, ReqId, {error, Error}} -> throw({maybe_retry_req, Error}); {ibrowse_async_response, ReqId, <<>>} -> - ibrowse:stream_next(ReqId), - stream_data_self(HttpDb, Params, Worker, ReqId, Cb); + accumulate_messages(ReqId, Acc, Timeout); {ibrowse_async_response, ReqId, Data} -> - ibrowse:stream_next(ReqId), - {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end}; + accumulate_messages(ReqId, [Data | Acc], 0); {ibrowse_async_response_end, ReqId} -> - {<<>>, fun() -> throw({maybe_retry_req, more_data_expected}) end} - after T + 500 -> + {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response_end} + after Timeout -> % Note: ibrowse should always reply with timeouts, but this doesn't % seem to be always true when there's a very high rate of requests % and many open connections. - throw({maybe_retry_req, timeout}) + if Acc =:= [] -> + throw({maybe_retry_req, timeout}); + true -> + {iolist_to_binary(lists:reverse(Acc)), ibrowse_async_response} + end end.
