Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master 0c8b69d41 -> 227cbc647


Fix stuck changes reader in clean_mailbox

Due to unfortunate timing issues it was possible for a changes reader to
get stuck in clean_mailbox reading an entire changes feed before
exiting. If the ibrowse call timed out right before ibrowse starts
sending messages then we would see clean_mailbox loop until the changes
feed terminated on the source.

This caps the number of messagses that can be cleaned up to a maximum of
sixteen. This limit is rather arbitrary. The cleanup was intended for when
only a couple messages were lingering. This is much larger than that
without being insanely large.

BugzId: 49717


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/227cbc64
Tree: 
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/227cbc64
Diff: 
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/227cbc64

Branch: refs/heads/master
Commit: 227cbc6475db926ec9fb1b5cc93c149299484ce7
Parents: 0c8b69d
Author: Paul J. Davis <[email protected]>
Authored: Tue Jul 21 16:01:46 2015 -0500
Committer: Robert Newson <[email protected]>
Committed: Mon Aug 24 16:39:28 2015 +0100

----------------------------------------------------------------------
 src/couch_replicator_httpc.erl | 36 ++++++++++++++++++++++++++++++++----
 1 file changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/227cbc64/src/couch_replicator_httpc.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_httpc.erl b/src/couch_replicator_httpc.erl
index e591601..052eb98 100644
--- a/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator_httpc.erl
@@ -30,6 +30,16 @@
 -define(STREAM_STATUS, ibrowse_stream_status).
 
 
+% This limit is for the number of messages we're willing to discard
+% from an HTTP stream in clean_mailbox/1 before killing the worker
+% and returning. The original intent for clean_mailbox was to remove
+% a single message or two if the changes feed returned before fully
+% consuming the request. This threshold gives us confidence we'll
+% continue to properly close changes feeds while avoiding any case
+% where we may end up processing an unbounded number of messages.
+-define(MAX_DISCARDED_MESSAGES, 16).
+
+
 setup(#httpdb{httpc_pool = nil, url = Url, http_connections = MaxConns} = Db) 
->
     {ok, Pid} = couch_replicator_httpc_pool:start_link(Url, [{max_connections, 
MaxConns}]),
     {ok, Db#httpdb{httpc_pool = Pid}}.
@@ -169,13 +179,31 @@ process_stream_response(ReqId, Worker, HttpDb, Params, 
Callback) ->
 % on the ibrowse_req_id format. This just drops all
 % messages for the given ReqId on the floor since we're
 % no longer in the HTTP request.
-clean_mailbox({ibrowse_req_id, ReqId}) ->
+
+clean_mailbox(ReqId) ->
+    clean_mailbox(ReqId, ?MAX_DISCARDED_MESSAGES).
+
+
+clean_mailbox(_ReqId, 0) ->
+    case get(?STREAM_STATUS) of
+        {streaming, Worker} ->
+            % We kill workers that continue to stream us
+            % messages after we give up but do *not* exit
+            % our selves. This is because we may be running
+            % as an exception unwinds and we don't want to
+            % change any of that subtle logic.
+            exit(Worker, {timeout, ibrowse_stream_cleanup});
+        _ ->
+            ok
+    end,
+    ok;
+clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 ->
     case get(?STREAM_STATUS) of
         streaming ->
             ibrowse:stream_next(ReqId),
             receive
                 {ibrowse_async_response, ReqId, _} ->
-                    clean_mailbox({ibrowse_req_id, ReqId});
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
                 {ibrowse_async_response_end, ReqId} ->
                     put(?STREAM_STATUS, ended),
                     ok
@@ -185,7 +213,7 @@ clean_mailbox({ibrowse_req_id, ReqId}) ->
         Status when Status == init; Status == ended ->
             receive
                 {ibrowse_async_response, ReqId, _} ->
-                    clean_mailbox({ibrowse_req_id, ReqId});
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
                 {ibrowse_async_response_end, ReqId} ->
                     put(?STREAM_STATUS, ended),
                     ok
@@ -193,7 +221,7 @@ clean_mailbox({ibrowse_req_id, ReqId}) ->
                     ok
             end
     end;
-clean_mailbox(_) ->
+clean_mailbox(_, Count) when Count > 0 ->
     ok.
 
 

Reply via email to