Repository: couchdb-couch
Updated Branches:
  refs/heads/1843-feature-bigcouch b13052435 -> d04839eaf


Add hard limit for OS processes

Prior to this commit, there was only a soft OS process cap in BigCouch.
This commit adds a hard process cap, in addition to the soft process
cap. When the hard process cap is reached, all requests for additional
processes are blocked until a process becomes available. The soft OS
process limit was left in place, so if the number of OS processes exceed
the soft limit (and some of the processes are idle for a certain amount
of time) Couch will shut down some of the processes.

Also, the soft OS process cap would limit the total number of OS
processes rather than OS processes associated with a particular
language. This was changed to be per-language in order to be consistent
with CouchDB's per-language hard cap.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/135dd706
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/135dd706
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/135dd706

Branch: refs/heads/1843-feature-bigcouch
Commit: 135dd706a79f124d0707d9a5ddec2ae587faeff7
Parents: b130524
Author: Benjamin Bastian <[email protected]>
Authored: Thu Feb 13 14:50:11 2014 -0800
Committer: Benjamin Bastian <[email protected]>
Committed: Mon Feb 17 10:45:43 2014 -0800

----------------------------------------------------------------------
 src/couch_proc_manager.erl | 212 +++++++++++++++++++++++++++-------------
 1 file changed, 144 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/135dd706/src/couch_proc_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 8027e76..5dd000b 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -17,7 +17,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
     code_change/3]).
 
--export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]).
+-export([start_link/0, get_proc_count/0, new_proc/1]).
 
 % config_listener api
 -export([handle_config_change/5]).
@@ -26,7 +26,17 @@
 
 -record(state, {
     tab,
-    config
+    config,
+    proc_counts,
+    waiting
+}).
+
+-record(client, {
+    timestamp,
+    from,
+    lang,
+    ddoc,
+    ddoc_key
 }).
 
 start_link() ->
@@ -40,7 +50,10 @@ init([]) ->
     ok = config:listen_for_changes(?MODULE, nil),
     {ok, #state{
         tab = ets:new(procs, [ordered_set, {keypos, #proc.pid}]),
-        config = get_proc_config()
+        config = get_proc_config(),
+        proc_counts = dict:new(),
+        waiting = ets:new(couch_proc_manage_waiting,
+                [ordered_set, {keypos, #client.timestamp}])
     }}.
 
 handle_call(get_table, _From, State) ->
@@ -50,12 +63,13 @@ handle_call(get_proc_count, _From, State) ->
     {reply, ets:info(State#state.tab, size), State};
 
 handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
-    {Client, _} = From,
-    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    {ClientPid, _} = From,
+    Lang = couch_util:to_binary(
+            couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
     IterFun = fun(Proc, Acc) ->
         case lists:member(DDocKey, Proc#proc.ddoc_keys) of
             true ->
-                {stop, assign_proc(State#state.tab, Client, Proc)};
+                {stop, assign_proc(State#state.tab, ClientPid, Proc)};
             false ->
                 {ok, Acc}
         end
@@ -63,60 +77,41 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, 
From, State) ->
     TeachFun = fun(Proc0, Acc) ->
         try
             {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
-            {stop, assign_proc(State#state.tab, Client, Proc1)}
+            {stop, assign_proc(State#state.tab, ClientPid, Proc1)}
         catch _:_ ->
             {ok, Acc}
         end
     end,
-    try iter_procs(State#state.tab, Lang, IterFun, nil) of
-    {not_found, _} ->
-        case iter_procs(State#state.tab, Lang, TeachFun, nil) of
-        {not_found, _} ->
-            spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]),
-            {noreply, State};
-        {ok, Proc} ->
-            {reply, {ok, Proc, State#state.config}, State}
-        end;
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
+    Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+    find_proc(State, Client, [IterFun, TeachFun]);
 
-handle_call({get_proc, Lang}, {Client, _} = From, State) ->
+handle_call({get_proc, Lang}, From, State) ->
+    {ClientPid, _} = From,
     IterFun = fun(Proc, _Acc) ->
-        {stop, assign_proc(State#state.tab, Client, Proc)}
+        {stop, assign_proc(State#state.tab, ClientPid, Proc)}
     end,
-    try iter_procs(State#state.tab, Lang, IterFun, nil) of
-    {not_found, _} ->
-        spawn_link(?MODULE, new_proc, [From, Lang]),
-        {noreply, State};
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
+    Client = #client{from=From, lang=couch_util:to_binary(Lang)},
+    find_proc(State, Client, [IterFun]);
 
-handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
+    Lang = couch_util:to_binary(Lang0),
     % We need to check if the process is alive here, as the client could be
     % handing us a #proc{} with a dead one.  We would have already removed the
     % #proc{} from our own table, so the alternative is to do a lookup in the
     % table before the insert.  Don't know which approach is cheaper.
-    return_proc(State#state.tab, Proc),
-    {reply, true, State};
+    {reply, true, return_proc(State, Proc#proc{lang=Lang})};
 
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
 
-handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
-    Limit = config:get("query_server_config", "os_process_soft_limit", "100"),
-    case ets:lookup(Tab, Pid) of
-        [#proc{client=nil}] ->
-            case ets:info(Tab, size) > list_to_integer(Limit) of
-                true ->
+handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
+    Limit = list_to_integer(
+            config:get("query_server_config", "os_process_soft_limit", "100")),
+    State = case ets:lookup(Tab, Pid) of
+        [#proc{client=nil, lang=Lang}] ->
+            case dict:find(Lang, Counts) of
+                {ok, Count} when Count > Limit ->
                     ?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
                     ets:delete(Tab, Pid),
                     case is_process_alive(Pid) of
@@ -125,12 +120,15 @@ handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
                             gen_server:cast(Pid, stop);
                         _ ->
                             ok
-                    end;
-                _ ->
-                    ok
+                    end,
+                    State0#state{
+                        proc_counts=dict:update_counter(Lang, -1, Counts)
+                    };
+                {ok, _} ->
+                    State0
             end;
         _ ->
-            ok
+            State0
     end,
     {noreply, State};
 handle_cast(reload_config, State) ->
@@ -142,25 +140,39 @@ handle_cast(_Msg, State) ->
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
 
-handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) ->
+handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
     link(Proc0#proc.pid),
-    Proc = assign_proc(State#state.tab, Client, Proc0),
+    Proc = assign_proc(State#state.tab, ClientPid, Proc0),
     gen_server:reply(From, {ok, Proc, State#state.config}),
     {noreply, State};
 
 handle_info({'EXIT', Pid, Reason}, State) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
     ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    MaybeProc = ets:lookup(State#state.tab, Pid),
     ets:delete(State#state.tab, Pid),
-    {noreply, State};
+    case MaybeProc of
+        [#proc{lang=Lang}] ->
+            case get_waiting_client(Waiting, Lang) of
+                nil ->
+                    {noreply, State#state{
+                        proc_counts=dict:update_counter(Lang, -1, Counts)
+                    }};
+                Client ->
+                    spawn_link(?MODULE, new_proc, [Client]),
+                    {noreply, State}
+            end;
+        [] ->
+            {noreply, State}
+    end;
 
-handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
-    case ets:match_object(State#state.tab, #proc{client=Ref, _='_'}) of
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+    case ets:match_object(State0#state.tab, #proc{client=Ref, _='_'}) of
     [] ->
-        ok;
+        {noreply, State0};
     [#proc{} = Proc] ->
-        return_proc(State#state.tab, Proc)
-    end,
-    {noreply, State};
+        {noreply, return_proc(State0, Proc)}
+    end;
 
 handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
     erlang:send_after(5000, self(), restart_config_listener),
@@ -189,8 +201,8 @@ handle_config_change("query_server_config", _, _, _, _) ->
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
 
-iter_procs(Tab, Lang, Fun, Acc) when is_binary(Lang) ->
-    iter_procs(Tab, binary_to_list(Lang), Fun, Acc);
+iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
+    iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
 iter_procs(Tab, Lang, Fun, Acc) ->
     Pattern = #proc{lang=Lang, client=nil, _='_'},
     MSpec = [{Pattern, [], ['$_']}],
@@ -216,15 +228,17 @@ iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) ->
             {ok, Acc1}
     end.
 
-new_proc(From, Lang) ->
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+    #client{from=From, lang=Lang} = Client,
     case new_proc_int(From, Lang) of
     {ok, Proc} ->
         exit({ok, Proc, From});
     Error ->
         gen_server:reply(From, {error, Error})
-    end.
+    end;
 
-new_proc(From, Lang, DDoc, DDocKey) ->
+new_proc(Client) ->
+    #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
     case new_proc_int(From, Lang) of
     {ok, NewProc} ->
         case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
@@ -267,19 +281,52 @@ make_proc(Pid, Lang, Mod) ->
     unlink(Pid),
     {ok, Proc}.
 
-assign_proc(Tab, Client, #proc{client=nil}=Proc0) ->
-    Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+assign_proc(Tab, ClientPid, #proc{client=nil}=Proc0) when is_pid(ClientPid) ->
+    Proc = Proc0#proc{client = erlang:monitor(process, ClientPid)},
     ets:insert(Tab, Proc),
-    Proc.
+    Proc;
+assign_proc(Tab, #client{}=Client, #proc{client=nil}=Proc) ->
+    {Pid, _} = Client#client.from,
+    assign_proc(Tab, Pid, Proc).
 
-return_proc(Tab, #proc{pid=Pid} = Proc) ->
+return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
+    #state{tab=Tab, waiting=Waiting} = State,
     case is_process_alive(Pid) of true ->
-        gen_server:cast(Pid, garbage_collect),
-        ets:insert(Tab, Proc#proc{client=nil});
+        case get_waiting_client(Waiting, Lang) of
+            nil ->
+                gen_server:cast(Pid, garbage_collect),
+                ets:insert(Tab, Proc#proc{client=nil}),
+                State;
+            #client{}=Client ->
+                From = Client#client.from,
+                assign_proc(Tab, Client, Proc#proc{client=nil}),
+                gen_server:reply(From, {ok, Proc, State#state.config}),
+                State
+        end;
     false ->
-        ets:delete(Tab, Pid)
+        ets:delete(Tab, Pid),
+        case get_waiting_client(Waiting, Lang) of
+            nil ->
+                State;
+            #client{}=Client ->
+                maybe_spawn_proc(State, Client)
+        end
     end.
 
+get_waiting_client(Tab, Lang) when is_list(Lang) ->
+    get_waiting_client(Tab, couch_util:to_binary(Lang));
+get_waiting_client(Tab, Lang) ->
+    case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+        '$end_of_table' ->
+            nil;
+        {[#client{}=Client], _} ->
+            ets:delete(Tab, Client#client.timestamp),
+            Client
+    end.
+
+add_waiting_client(Tab, Client) ->
+    ets:insert(Tab, Client#client{timestamp=now()}).
+
 get_proc_config() ->
     Limit = config:get("query_server_config", "reduce_limit", "true"),
     Timeout = config:get("couchdb", "os_process_timeout", "5000"),
@@ -317,3 +364,32 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, 
#proc{ddoc_keys=Keys}=Proc) ->
     Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
     % add ddoc to the proc
     {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
+maybe_spawn_proc(State, Client) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
+    #client{lang=Lang} = Client,
+    Limit = list_to_integer(config:get(
+                "query_server_config", "os_process_limit", "100")),
+    case dict:find(Lang, Counts) of
+    {ok, Limit} ->
+        add_waiting_client(Waiting, Client),
+        State;
+    _ ->
+        spawn_link(?MODULE, new_proc, [Client]),
+        State#state{
+            proc_counts=dict:update_counter(Lang, 1, Counts)
+        }
+    end.
+
+find_proc(State, Client, [Fun|FindFuns]) ->
+    try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
+    {not_found, _} ->
+        find_proc(State, Client, FindFuns);
+    {ok, Proc} ->
+        {reply, {ok, Proc, State#state.config}, State}
+    catch error:Reason ->
+        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+        {reply, {error, Reason}, State}
+    end;
+find_proc(State, Client, []) ->
+    {noreply, maybe_spawn_proc(State, Client)}.

Reply via email to