This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0832393  Prevent chttpd multipart zombie processes
0832393 is described below

commit 083239353e919e897b97e8a96ee07cb42ca4eccd
Author: Jan Lehnardt <j...@apache.org>
AuthorDate: Tue Feb 13 15:32:29 2018 +0100

    Prevent chttpd multipart zombie processes
    
    Occasionally it's possible to lose track of our RPC workers in the main
    multipart parsing code. This change monitors each worker process and
    then exits if all workers have exited before the parser considers itself
    finished.
    
    Fixes part of #745
---
 src/couch/src/couch_httpd_multipart.erl | 77 ++++++++++++++++++++++++++++-----
 1 file changed, 65 insertions(+), 12 deletions(-)

diff --git a/src/couch/src/couch_httpd_multipart.erl 
b/src/couch/src/couch_httpd_multipart.erl
index 6ce3c76..e556b28 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, 
Waiting}) ->
         abort_parsing ->
             ok;
         {get_bytes, Ref, From} ->
-            C2 = orddict:update_counter(From, 1, Counters),
-            NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, 
[From|Waiting]}),
-            mp_parse_atts(eof, NewAcc);
+            C2 = update_writer(From, Counters),
+            case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of
+                abort_parsing ->
+                    ok;
+                NewAcc ->
+                    mp_parse_atts(eof, NewAcc)
+            end;
         {'DOWN', ParentRef, _, _, _} ->
-            exit(mp_reader_coordinator_died)
-        after 3600000 ->
+            exit(mp_reader_coordinator_died);
+        {'DOWN', WriterRef, _, WriterPid, _} ->
+            case remove_writer(WriterPid, WriterRef, Counters) of
+                abort_parsing ->
+                    ok;
+                C2 ->
+                    NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]},
+                    mp_parse_atts(eof, NewAcc)
+            end
+        after 300000 ->
             ok
         end
     end.
@@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) ->
 
 maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
     receive {get_bytes, Ref, From} ->
-        NewCounters = orddict:update_counter(From, 1, Counters),
+        NewCounters = update_writer(From, Counters),
         maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
     after 0 ->
         % reply to as many writers as possible
         NewWaiting = lists:filter(fun(Writer) ->
-            WhichChunk = orddict:fetch(Writer, Counters),
+            {_, WhichChunk} = orddict:fetch(Writer, Counters),
             ListIndex = WhichChunk - Offset,
             if ListIndex =< length(Chunks) ->
                 Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
@@ -132,11 +144,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) 
->
         end, Waiting),
 
         % check if we can drop a chunk from the head of the list
-        case Counters of
+        SmallestIndex = case Counters of
         [] ->
-            SmallestIndex = 0;
+            0;
         _ ->
-            SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
+            lists:min([C || {_WPid, {_WRef, C}} <- Counters])
         end,
         Size = length(Counters),
         N = num_mp_writers(),
@@ -149,7 +161,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
         end,
 
         % we should wait for a writer if no one has written the last chunk
-        LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
+        LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- 
Counters]),
         if LargestIndex  >= (Offset + length(Chunks)) ->
             % someone has written all possible chunks, keep moving
             {Ref, NewChunks, NewOffset, Counters, NewWaiting};
@@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) 
->
                 abort_parsing;
             {'DOWN', ParentRef, _, _, _} ->
                 exit(mp_reader_coordinator_died);
+            {'DOWN', WriterRef, _, WriterPid, _} ->
+                case remove_writer(WriterPid, WriterRef, Counters) of
+                    abort_parsing ->
+                        abort_parsing;
+                    C2 ->
+                        RestWaiting = NewWaiting -- [WriterPid],
+                        NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
+                        maybe_send_data(NewAcc)
+                end;
             {get_bytes, Ref, X} ->
-                C2 = orddict:update_counter(X, 1, Counters),
+                C2 = update_writer(X, Counters),
                 maybe_send_data({Ref, NewChunks, NewOffset, C2, 
[X|NewWaiting]})
+            after 300000 ->
+                abort_parsing
             end
         end
     end.
 
 
+update_writer(WriterPid, Counters) ->
+    UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
+    InitialValue = case orddict:find(WriterPid, Counters) of
+        {ok, IV} ->
+            IV;
+        error ->
+            WriterRef = erlang:monitor(process, WriterPid),
+            {WriterRef, 1}
+    end,
+    orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+
+
+remove_writer(WriterPid, WriterRef, Counters) ->
+    case orddict:find(WriterPid, Counters) of
+        {ok, {WriterRef, _}} ->
+            case num_mp_writers() of
+                N when N > 1 ->
+                    num_mp_writers(N - 1);
+                _ ->
+                    abort_parsing
+            end;
+        {ok, _} ->
+            % We got a different ref fired for a known worker
+            abort_parsing;
+        error ->
+            % Unknown worker pid?
+            abort_parsing
+    end.
+
+
 num_mp_writers(N) ->
     erlang:put(mp_att_writers, N).
 

-- 
To stop receiving notification emails like this one, please contact
dav...@apache.org.

Reply via email to