This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch improve-replicator-stream-mailbox-cleanup in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit aeed4f8b7b037dafb15882966d636a9543a91f38 Author: Nick Vatamaniuc <[email protected]> AuthorDate: Fri May 30 18:04:52 2025 -0400 Improve replicator client mailbox flush Previously users noticed periodic `ibrowse_stream_cleanup` crashes. Replication jobs would restart as expected, but it looks kind of messy and takes longer. To improve the situation a bit: * Increase the number of flushed messages. Flushing is quick so we can clean a few more messages than 16 if need be, so pick 100. * If we found the worker was dead, we set the stream status to `ended` but we didn't actually flush the mailbox, just exited with `ok`. So make sure to also do the flushing. * Streamer also emits `ibrowse_async_response_timeout` messages, but we never flushed those, so make sure do so. * `ibrowse:stream_next(ReqId)` may fail if the request ID to Pid ets table was already cleaned. Previously we would then wait for 30 seconds without getting the response and crash the job. So make sure to catch that error, skip the wait, set stream status to `ended`, and then continue flushing the mailbox. * Explicitly assert `ibrowse:stream_next(ReqId)` result is `ok` in cases when we call it. We'll crash if it isn't and restart the job. Fix: https://github.com/apache/couchdb/issues/5554 --- .../src/couch_replicator_httpc.erl | 161 +++++++++++++++++++-- 1 file changed, 148 insertions(+), 13 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_httpc.erl b/src/couch_replicator/src/couch_replicator_httpc.erl index cd5e4d75d..c9748125b 100644 --- a/src/couch_replicator/src/couch_replicator_httpc.erl +++ b/src/couch_replicator/src/couch_replicator_httpc.erl @@ -38,7 +38,7 @@ % consuming the request. This threshold gives us confidence we'll % continue to properly close changes feeds while avoiding any case % where we may end up processing an unbounded number of messages. --define(MAX_DISCARDED_MESSAGES, 16). +-define(MAX_DISCARDED_MESSAGES, 100). setup(Db) -> #httpdb{ @@ -230,7 +230,7 @@ process_stream_response(ReqId, Worker, HttpDb, Params, Callback) -> Ok =:= 413 -> put(?STOP_HTTP_WORKER, stop); true -> ok end, - ibrowse:stream_next(ReqId), + ok = ibrowse:stream_next(ReqId), try Ret = Callback(Ok, Headers, StreamDataFun), Ret @@ -297,12 +297,16 @@ clean_mailbox({ibrowse_req_id, ReqId}, Count) when Count > 0 -> discard_message(ReqId, Worker, Count); false -> put(?STREAM_STATUS, ended), - ok + % Worker is not alive but we may still messages + % in the mailbox from it so recurse to clean them + clean_mailbox({ibrowse_req_id, ReqId}, Count) end; Status when Status == init; Status == ended -> receive {ibrowse_async_response, ReqId, _} -> clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); + {ibrowse_async_response_timeout, ReqId} -> + clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); {ibrowse_async_response_end, ReqId} -> put(?STREAM_STATUS, ended), ok @@ -314,16 +318,26 @@ clean_mailbox(_, Count) when Count > 0 -> ok. discard_message(ReqId, Worker, Count) -> - ibrowse:stream_next(ReqId), - receive - {ibrowse_async_response, ReqId, _} -> - clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); - {ibrowse_async_response_end, ReqId} -> + case ibrowse:stream_next(ReqId) of + ok -> + receive + {ibrowse_async_response, ReqId, _} -> + clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); + {ibrowse_async_response_timeout, ReqId} -> + clean_mailbox({ibrowse_req_id, ReqId}, Count - 1); + {ibrowse_async_response_end, ReqId} -> + put(?STREAM_STATUS, ended), + ok + after 30000 -> + exit(Worker, {timeout, ibrowse_stream_cleanup}), + exit({timeout, ibrowse_stream_cleanup}) + end; + {error, unknown_req_id} -> + % The stream is being torn down so expect to handle stream ids not + % being found. We don't want to sleep for 30 seconds and then exit. + % Just clean any left-over mailbox messages and move on. put(?STREAM_STATUS, ended), - ok - after 30000 -> - exit(Worker, {timeout, ibrowse_stream_cleanup}), - exit({timeout, ibrowse_stream_cleanup}) + clean_mailbox({ibrowse_req_id, ReqId}, Count) end. -spec maybe_retry(any(), pid(), #httpdb{}, list()) -> no_return(). @@ -403,7 +417,7 @@ error_cause(Cause) -> stream_data_self(#httpdb{timeout = T} = HttpDb, Params, Worker, ReqId, Cb) -> case accumulate_messages(ReqId, [], T + 500) of {Data, ibrowse_async_response} -> - ibrowse:stream_next(ReqId), + ok = ibrowse:stream_next(ReqId), {Data, fun() -> stream_data_self(HttpDb, Params, Worker, ReqId, Cb) end}; {Data, ibrowse_async_response_end} -> put(?STREAM_STATUS, ended), @@ -540,4 +554,125 @@ merge_headers_test() -> ?assertEqual([{"a", "y"}], merge_headers([{"A", "z"}, {"a", "y"}], [])), ?assertEqual([{"a", "y"}], merge_headers([], [{"A", "z"}, {"a", "y"}])). +clean_mailbox_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_clean_noop), + ?TDEF_FE(t_clean_skip_other_messages), + ?TDEF_FE(t_clean_when_init), + ?TDEF_FE(t_clean_when_ended), + ?TDEF_FE(t_clean_when_streaming), + ?TDEF_FE(t_clean_when_streaming_dead_pid), + ?TDEF_FE(t_other_req_id_is_ignored) + ] + }. + +setup() -> + meck:new(ibrowse), + meck:expect(ibrowse, stream_next, 1, ok), + ok. + +teardown(_) -> + meck:unload(). + +t_clean_noop(_) -> + ReqId = make_ref(), + ?assertEqual(ok, clean_mailbox(random_junk)), + meck:expect(ibrowse, stream_next, 1, {error, unknown_req_id}), + set_stream_status({streaming, self()}), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + set_stream_status(init), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + set_stream_status(ended), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})). + +t_clean_skip_other_messages(_) -> + set_stream_status(init), + self() ! other_message, + ReqId = make_ref(), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + ?assertEqual([other_message], flush()). + +t_clean_when_init(_) -> + set_stream_status(init), + ReqId = make_ref(), + add_all_message_types(ReqId), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + ?assertEqual([], flush()), + ?assertEqual(ended, stream_status()). + +t_clean_when_ended(_) -> + set_stream_status(init), + ReqId = make_ref(), + add_all_message_types(ReqId), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + ?assertEqual([], flush()), + ?assertEqual(ended, stream_status()). + +t_clean_when_streaming(_) -> + set_stream_status({streaming, self()}), + ReqId = make_ref(), + add_all_message_types(ReqId), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + ?assertEqual([], flush()), + ?assertEqual(ended, stream_status()). + +t_clean_when_streaming_dead_pid(_) -> + {Pid, Ref} = spawn_monitor(fun() -> ok end), + receive + {'DOWN', Ref, _, _, _} -> ok + end, + set_stream_status({streaming, Pid}), + ReqId = make_ref(), + add_all_message_types(ReqId), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId})), + ?assertEqual([], flush()), + ?assertEqual(ended, stream_status()). + +t_other_req_id_is_ignored(_) -> + set_stream_status({streaming, self()}), + ReqId1 = make_ref(), + add_all_message_types(ReqId1), + ReqId2 = make_ref(), + add_all_message_types(ReqId2), + ?assertEqual(ok, clean_mailbox({ibrowse_req_id, ReqId1})), + ?assertEqual( + [ + {ibrowse_async_response, ReqId2, foo}, + {ibrowse_async_response_timeout, ReqId2}, + {ibrowse_async_response_end, ReqId2} + ], + flush() + ), + ?assertEqual(ended, stream_status()). + +stream_status() -> + get(?STREAM_STATUS). + +set_stream_status(Status) -> + put(?STREAM_STATUS, Status). + +add_all_message_types(ReqId) -> + Messages = [ + {ibrowse_async_response, ReqId, foo}, + {ibrowse_async_response_timeout, ReqId}, + {ibrowse_async_response_end, ReqId} + ], + [self() ! M || M <- Messages], + ok. + +flush() -> + flush([]). + +flush(Acc) -> + receive + Msg -> + flush([Msg | Acc]) + after 0 -> + lists:reverse(Acc) + end. + -endif.
