This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/master by this push: new 0832393 Prevent chttpd multipart zombie processes 0832393 is described below commit 083239353e919e897b97e8a96ee07cb42ca4eccd Author: Jan Lehnardt <j...@apache.org> AuthorDate: Tue Feb 13 15:32:29 2018 +0100 Prevent chttpd multipart zombie processes Occasionally it's possible to lose track of our RPC workers in the main multipart parsing code. This change monitors each worker process and then exits if all workers have exited before the parser considers itself finished. Fixes part of #745 --- src/couch/src/couch_httpd_multipart.erl | 77 ++++++++++++++++++++++++++++----- 1 file changed, 65 insertions(+), 12 deletions(-) diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl index 6ce3c76..e556b28 100644 --- a/src/couch/src/couch_httpd_multipart.erl +++ b/src/couch/src/couch_httpd_multipart.erl @@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing -> ok; {get_bytes, Ref, From} -> - C2 = orddict:update_counter(From, 1, Counters), - NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), - mp_parse_atts(eof, NewAcc); + C2 = update_writer(From, Counters), + case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of + abort_parsing -> + ok; + NewAcc -> + mp_parse_atts(eof, NewAcc) + end; {'DOWN', ParentRef, _, _, _} -> - exit(mp_reader_coordinator_died) - after 3600000 -> + exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + ok; + C2 -> + NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, + mp_parse_atts(eof, NewAcc) + end + after 300000 -> ok end end. @@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) -> maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> receive {get_bytes, Ref, From} -> - NewCounters = orddict:update_counter(From, 1, Counters), + NewCounters = update_writer(From, Counters), maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) after 0 -> % reply to as many writers as possible NewWaiting = lists:filter(fun(Writer) -> - WhichChunk = orddict:fetch(Writer, Counters), + {_, WhichChunk} = orddict:fetch(Writer, Counters), ListIndex = WhichChunk - Offset, if ListIndex =< length(Chunks) -> Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, @@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end, Waiting), % check if we can drop a chunk from the head of the list - case Counters of + SmallestIndex = case Counters of [] -> - SmallestIndex = 0; + 0; _ -> - SmallestIndex = lists:min(element(2, lists:unzip(Counters))) + lists:min([C || {_WPid, {_WRef, C}} <- Counters]) end, Size = length(Counters), N = num_mp_writers(), @@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> end, % we should wait for a writer if no one has written the last chunk - LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]), + LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]), if LargestIndex >= (Offset + length(Chunks)) -> % someone has written all possible chunks, keep moving {Ref, NewChunks, NewOffset, Counters, NewWaiting}; @@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> abort_parsing; {'DOWN', ParentRef, _, _, _} -> exit(mp_reader_coordinator_died); + {'DOWN', WriterRef, _, WriterPid, _} -> + case remove_writer(WriterPid, WriterRef, Counters) of + abort_parsing -> + abort_parsing; + C2 -> + RestWaiting = NewWaiting -- [WriterPid], + NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, + maybe_send_data(NewAcc) + end; {get_bytes, Ref, X} -> - C2 = orddict:update_counter(X, 1, Counters), + C2 = update_writer(X, Counters), maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) + after 300000 -> + abort_parsing end end end. +update_writer(WriterPid, Counters) -> + UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, + InitialValue = case orddict:find(WriterPid, Counters) of + {ok, IV} -> + IV; + error -> + WriterRef = erlang:monitor(process, WriterPid), + {WriterRef, 1} + end, + orddict:update(WriterPid, UpdateFun, InitialValue, Counters). + + +remove_writer(WriterPid, WriterRef, Counters) -> + case orddict:find(WriterPid, Counters) of + {ok, {WriterRef, _}} -> + case num_mp_writers() of + N when N > 1 -> + num_mp_writers(N - 1); + _ -> + abort_parsing + end; + {ok, _} -> + % We got a different ref fired for a known worker + abort_parsing; + error -> + % Unknown worker pid? + abort_parsing + end. + + num_mp_writers(N) -> erlang:put(mp_att_writers, N). -- To stop receiving notification emails like this one, please contact dav...@apache.org.