This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-replicator-stream-mailbox-cleanup
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit aeed4f8b7b037dafb15882966d636a9543a91f38
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Fri May 30 18:04:52 2025 -0400

    Improve replicator client mailbox flush
    
    Previously users noticed periodic `ibrowse_stream_cleanup` crashes. 
Replication
    jobs would restart as expected, but it looks kind of messy and takes longer.
    
    To improve the situation a bit:
    
     * Increase the number of flushed messages. Flushing is quick so we can
     clean a few more messages than 16 if need be, so pick 100.
    
     * If we found the worker was dead, we set the stream status to `ended` but 
we
     didn't actually flush the mailbox, just exited with `ok`. So make sure to 
also
     do the flushing.
    
     * Streamer also emits `ibrowse_async_response_timeout` messages, but we 
never
     flushed those, so make sure do so.
    
     * `ibrowse:stream_next(ReqId)` may fail if the request ID to Pid ets table 
was
     already cleaned. Previously we would then wait for 30 seconds without 
getting
     the response and crash the job. So make sure to catch that error, skip the
     wait, set stream status to `ended`, and then continue flushing the mailbox.
    
     * Explicitly assert `ibrowse:stream_next(ReqId)` result is `ok` in cases 
when
     we call it. We'll crash if it isn't and restart the job.
    
    Fix: https://github.com/apache/couchdb/issues/5554
---
 .../src/couch_replicator_httpc.erl                 | 161 +++++++++++++++++++--
 1 file changed, 148 insertions(+), 13 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl 
b/src/couch_replicator/src/couch_replicator_httpc.erl
index cd5e4d75d..c9748125b 100644
--- a/src/couch_replicator/src/couch_replicator_httpc.erl
+++ b/src/couch_replicator/src/couch_replicator_httpc.erl
@@ -38,7 +38,7 @@
 % consuming the request. This threshold gives us confidence we'll
 % continue to properly close changes feeds while avoiding any case
 % where we may end up processing an unbounded number of messages.
--define(MAX_DISCARDED_MESSAGES, 16).
+-define(MAX_DISCARDED_MESSAGES, 100).
 
 setup(Db) ->
     #httpdb{
@@ -230,7 +230,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, 
Callback) ->
                         Ok =:= 413 -> put(?STOP_HTTP_WORKER, stop);
                         true -> ok
                     end,
-                    ibrowse:stream_next(ReqId),
+                    ok = ibrowse:stream_next(ReqId),
                     try
                         Ret = Callback(Ok, Headers, StreamDataFun),
                         Ret
@@ -297,12 +297,16 @@ clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count 
> 0 ->
                     discard_message(ReqId, Worker, Count);
                 false ->
                     put(?STREAM_STATUS, ended),
-                    ok
+                    % Worker is not alive but we may still messages
+                    % in the mailbox from it so recurse to clean them
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count)
             end;
         Status when Status == init; Status == ended ->
             receive
                 {ibrowse_async_response, ReqId, _} ->
                     clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+                {ibrowse_async_response_timeout, ReqId} ->
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
                 {ibrowse_async_response_end, ReqId} ->
                     put(?STREAM_STATUS, ended),
                     ok
@@ -314,16 +318,26 @@ clean_mailbox(_, Count) when Count > 0 ->
     ok.
 
 discard_message(ReqId, Worker, Count) ->
-    ibrowse:stream_next(ReqId),
-    receive
-        {ibrowse_async_response, ReqId, _} ->
-            clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
-        {ibrowse_async_response_end, ReqId} ->
+    case ibrowse:stream_next(ReqId) of
+        ok ->
+            receive
+                {ibrowse_async_response, ReqId, _} ->
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+                {ibrowse_async_response_timeout, ReqId} ->
+                    clean_mailbox({ibrowse_req_id, ReqId}, Count - 1);
+                {ibrowse_async_response_end, ReqId} ->
+                    put(?STREAM_STATUS, ended),
+                    ok
+            after 30000 ->
+                exit(Worker, {timeout, ibrowse_stream_cleanup}),
+                exit({timeout, ibrowse_stream_cleanup})
+            end;
+        {error, unknown_req_id} ->
+            % The stream is being torn down so expect to handle stream ids not
+            % being found. We don't want to sleep for 30 seconds and then exit.
+            % Just clean any left-over mailbox messages and move on.
             put(?STREAM_STATUS, ended),
-            ok
-    after 30000 ->
-        exit(Worker, {timeout, ibrowse_stream_cleanup}),
-        exit({timeout, ibrowse_stream_cleanup})
+            clean_mailbox({ibrowse_req_id, ReqId}, Count)
     end.
 
 -spec maybe_retry(any(), pid(), #httpdb{}, list()) -> no_return().
@@ -403,7 +417,7 @@ error_cause(Cause) ->
 stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) ->
     case accumulate_messages(ReqId, [], T + 500) of
         {Data, ibrowse_async_response} ->
-            ibrowse:stream_next(ReqId),
+            ok = ibrowse:stream_next(ReqId),
             {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, 
Cb) end};
         {Data, ibrowse_async_response_end} ->
             put(?STREAM_STATUS, ended),
@@ -540,4 +554,125 @@ merge_headers_test() ->
     ?assertEqual([{"a", "y"}], merge_headers([{"A", "z"}, {"a", "y"}], [])),
     ?assertEqual([{"a", "y"}], merge_headers([], [{"A", "z"}, {"a", "y"}])).
 
+clean_mailbox_test_() ->
+    {
+        foreach,
+        fun setup/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_clean_noop),
+            ?TDEF_FE(t_clean_skip_other_messages),
+            ?TDEF_FE(t_clean_when_init),
+            ?TDEF_FE(t_clean_when_ended),
+            ?TDEF_FE(t_clean_when_streaming),
+            ?TDEF_FE(t_clean_when_streaming_dead_pid),
+            ?TDEF_FE(t_other_req_id_is_ignored)
+        ]
+    }.
+
+setup() ->
+    meck:new(ibrowse),
+    meck:expect(ibrowse, stream_next, 1, ok),
+    ok.
+
+teardown(_) ->
+    meck:unload().
+
+t_clean_noop(_) ->
+    ReqId = make_ref(),
+    ?assertEqual(ok, clean_mailbox(random_junk)),
+    meck:expect(ibrowse, stream_next, 1, {error, unknown_req_id}),
+    set_stream_status({streaming, self()}),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    set_stream_status(init),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    set_stream_status(ended),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})).
+
+t_clean_skip_other_messages(_) ->
+    set_stream_status(init),
+    self() ! other_message,
+    ReqId = make_ref(),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    ?assertEqual([other_message], flush()).
+
+t_clean_when_init(_) ->
+    set_stream_status(init),
+    ReqId = make_ref(),
+    add_all_message_types(ReqId),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    ?assertEqual([], flush()),
+    ?assertEqual(ended, stream_status()).
+
+t_clean_when_ended(_) ->
+    set_stream_status(init),
+    ReqId = make_ref(),
+    add_all_message_types(ReqId),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    ?assertEqual([], flush()),
+    ?assertEqual(ended, stream_status()).
+
+t_clean_when_streaming(_) ->
+    set_stream_status({streaming, self()}),
+    ReqId = make_ref(),
+    add_all_message_types(ReqId),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    ?assertEqual([], flush()),
+    ?assertEqual(ended, stream_status()).
+
+t_clean_when_streaming_dead_pid(_) ->
+    {Pid, Ref} = spawn_monitor(fun() -> ok end),
+    receive
+        {'DOWN', Ref, _, _, _} -> ok
+    end,
+    set_stream_status({streaming, Pid}),
+    ReqId = make_ref(),
+    add_all_message_types(ReqId),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})),
+    ?assertEqual([], flush()),
+    ?assertEqual(ended, stream_status()).
+
+t_other_req_id_is_ignored(_) ->
+    set_stream_status({streaming, self()}),
+    ReqId1 = make_ref(),
+    add_all_message_types(ReqId1),
+    ReqId2 = make_ref(),
+    add_all_message_types(ReqId2),
+    ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId1})),
+    ?assertEqual(
+        [
+            {ibrowse_async_response, ReqId2, foo},
+            {ibrowse_async_response_timeout, ReqId2},
+            {ibrowse_async_response_end, ReqId2}
+        ],
+        flush()
+    ),
+    ?assertEqual(ended, stream_status()).
+
+stream_status() ->
+    get(?STREAM_STATUS).
+
+set_stream_status(Status) ->
+    put(?STREAM_STATUS, Status).
+
+add_all_message_types(ReqId) ->
+    Messages = [
+        {ibrowse_async_response, ReqId, foo},
+        {ibrowse_async_response_timeout, ReqId},
+        {ibrowse_async_response_end, ReqId}
+    ],
+    [self() ! M || M <- Messages],
+    ok.
+
+flush() ->
+    flush([]).
+
+flush(Acc) ->
+    receive
+        Msg ->
+            flush([Msg | Acc])
+    after 0 ->
+        lists:reverse(Acc)
+    end.
+
 -endif.

Reply via email to