Throttle parallel doc lookup requests

When POSTing keys to _all_docs it would spawn N requests in parallel for
every key in the request. This can end up swapping the system pretty
quickly as demonstrated by some import scripts that clients were
running.

This fix just constrains the numbr of simultaneous document reads to a
configurable amount. The default is 10 and can be set via couch_config:

    [fabric]
    all_docs_concurrency = 25


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f2ea47a3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f2ea47a3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f2ea47a3

Branch: refs/heads/import
Commit: f2ea47a3427d82b77818c1b3df52a6c29483159e
Parents: ed385ea
Author: Paul J. Davis <paul.joseph.da...@gmail.com>
Authored: Thu Dec 29 23:07:38 2011 -0600
Committer: Paul J. Davis <paul.joseph.da...@gmail.com>
Committed: Mon Nov 26 13:24:47 2012 -0600

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 87 +++++++++++++++++++++++++++------------
 1 file changed, 61 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f2ea47a3/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 4d3d58a..b85b6c4 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -51,17 +51,32 @@ go(DbName, QueryArgs, Callback, Acc0) ->
     #view_query_args{
         direction = Dir,
         include_docs = IncludeDocs,
-        limit = Limit0,
-        skip = Skip0,
-        keys = Keys
+        limit = Limit,
+        skip = Skip,
+        keys = Keys0
     } = QueryArgs,
     {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
-    Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
-        Id <- Keys],
-    Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
+    SpawnFun = fun(Key) ->
+        spawn_monitor(?MODULE, open_doc, [DbName, Key, IncludeDocs])
+    end,
+    MaxJobs = all_docs_concurrency(),
+    Keys1 = case Dir of
+        fwd -> Keys0;
+        _ -> lists:reverse(Keys0)
+    end,
+    Keys2 = case Skip < length(Keys1) of
+        true -> lists:nthtail(Skip, Keys1);
+        false -> []
+    end,
+    Keys3 = case Limit < length(Keys2) of
+        true -> lists:sublist(Keys2, Limit);
+        false -> Keys2
+    end,
     receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
         {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
-        {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
+        {ok, Acc2} = doc_receive_loop(
+            Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
+        ),
         Callback(complete, Acc2)
     after 10000 ->
         Callback(timeout, Acc0)
@@ -137,26 +152,36 @@ merge_row(fwd, Row, Rows) ->
 merge_row(rev, Row, Rows) ->
     lists:rkeymerge(#view_row.id, [Row], Rows).
 
-doc_receive_loop([], _, _, _, Acc) ->
-    {ok, Acc};
-doc_receive_loop(_, _, 0, _, Acc) ->
-    {ok, Acc};
-doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
-    receive {'DOWN', Ref, process, Pid, #view_row{}} ->
-        doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
-    after 10000 ->
-        timeout
-    end;
-doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
-    receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
-        case Callback(fabric_view:transform_row(Row), AccIn) of
-        {ok, Acc} ->
-            doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
-        {stop, Acc} ->
-            {ok, Acc}
+all_docs_concurrency() ->
+    Value = couch_config:get("fabric", "all_docs_concurrency", "10"),
+    try
+        list_to_integer(Value)
+    catch _:_ ->
+        10
+    end.
+
+doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
+    case {Keys, queue:len(Pids)} of
+    {[], 0} ->
+        {ok, AccIn};
+    {[K | RKeys], Len} when Len < MaxJobs ->
+        Pids1 = queue:in(SpawnFun(K), Pids),
+        doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn);
+    _ ->
+        {{value, {Pid, Ref}}, RestPids} = queue:out(Pids),
+        receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
+            case Callback(fabric_view:transform_row(Row), AccIn) of
+            {ok, Acc} ->
+                doc_receive_loop(
+                    Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc
+                );
+            {stop, Acc} ->
+                cancel_read_pids(RestPids),
+                {ok, Acc}
+            end
+        after 10000 ->
+            timeout
         end
-    after 10000 ->
-        timeout
     end.
 
 open_doc(DbName, Id, IncludeDocs) ->
@@ -176,3 +201,13 @@ open_doc(DbName, Id, IncludeDocs) ->
         #view_row{key=Id, id=Id, value=Value}
     end,
     exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
+
+cancel_read_pids(Pids) ->
+    case queue:out(Pids) of
+        {{value, {Pid, Ref}}, RestPids} ->
+            exit(Pid, kill),
+            erlang:demonitor(Ref, [flush]),
+            cancel_read_pids(RestPids);
+        {empty, _} ->
+            ok
+    end.

Reply via email to