Throttle parallel doc lookup requests When POSTing keys to _all_docs it would spawn N requests in parallel for every key in the request. This can end up swapping the system pretty quickly as demonstrated by some import scripts that clients were running.
This fix just constrains the numbr of simultaneous document reads to a configurable amount. The default is 10 and can be set via couch_config: [fabric] all_docs_concurrency = 25 Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f2ea47a3 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f2ea47a3 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f2ea47a3 Branch: refs/heads/import Commit: f2ea47a3427d82b77818c1b3df52a6c29483159e Parents: ed385ea Author: Paul J. Davis <paul.joseph.da...@gmail.com> Authored: Thu Dec 29 23:07:38 2011 -0600 Committer: Paul J. Davis <paul.joseph.da...@gmail.com> Committed: Mon Nov 26 13:24:47 2012 -0600 ---------------------------------------------------------------------- src/fabric_view_all_docs.erl | 87 +++++++++++++++++++++++++++------------ 1 file changed, 61 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f2ea47a3/src/fabric_view_all_docs.erl ---------------------------------------------------------------------- diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl index 4d3d58a..b85b6c4 100644 --- a/src/fabric_view_all_docs.erl +++ b/src/fabric_view_all_docs.erl @@ -51,17 +51,32 @@ go(DbName, QueryArgs, Callback, Acc0) -> #view_query_args{ direction = Dir, include_docs = IncludeDocs, - limit = Limit0, - skip = Skip0, - keys = Keys + limit = Limit, + skip = Skip, + keys = Keys0 } = QueryArgs, {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), - Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || - Id <- Keys], - Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, + SpawnFun = fun(Key) -> + spawn_monitor(?MODULE, open_doc, [DbName, Key, IncludeDocs]) + end, + MaxJobs = all_docs_concurrency(), + Keys1 = case Dir of + fwd -> Keys0; + _ -> lists:reverse(Keys0) + end, + Keys2 = case Skip < length(Keys1) of + true -> lists:nthtail(Skip, Keys1); + false -> [] + end, + Keys3 = case Limit < length(Keys2) of + true -> lists:sublist(Keys2, Limit); + false -> Keys2 + end, receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), - {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), + {ok, Acc2} = doc_receive_loop( + Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1 + ), Callback(complete, Acc2) after 10000 -> Callback(timeout, Acc0) @@ -137,26 +152,36 @@ merge_row(fwd, Row, Rows) -> merge_row(rev, Row, Rows) -> lists:rkeymerge(#view_row.id, [Row], Rows). -doc_receive_loop([], _, _, _, Acc) -> - {ok, Acc}; -doc_receive_loop(_, _, 0, _, Acc) -> - {ok, Acc}; -doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> - receive {'DOWN', Ref, process, Pid, #view_row{}} -> - doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) - after 10000 -> - timeout - end; -doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> - receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> - case Callback(fabric_view:transform_row(Row), AccIn) of - {ok, Acc} -> - doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); - {stop, Acc} -> - {ok, Acc} +all_docs_concurrency() -> + Value = couch_config:get("fabric", "all_docs_concurrency", "10"), + try + list_to_integer(Value) + catch _:_ -> + 10 + end. + +doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) -> + case {Keys, queue:len(Pids)} of + {[], 0} -> + {ok, AccIn}; + {[K | RKeys], Len} when Len < MaxJobs -> + Pids1 = queue:in(SpawnFun(K), Pids), + doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn); + _ -> + {{value, {Pid, Ref}}, RestPids} = queue:out(Pids), + receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> + case Callback(fabric_view:transform_row(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop( + Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc + ); + {stop, Acc} -> + cancel_read_pids(RestPids), + {ok, Acc} + end + after 10000 -> + timeout end - after 10000 -> - timeout end. open_doc(DbName, Id, IncludeDocs) -> @@ -176,3 +201,13 @@ open_doc(DbName, Id, IncludeDocs) -> #view_row{key=Id, id=Id, value=Value} end, exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). + +cancel_read_pids(Pids) -> + case queue:out(Pids) of + {{value, {Pid, Ref}}, RestPids} -> + exit(Pid, kill), + erlang:demonitor(Ref, [flush]), + cancel_read_pids(RestPids); + {empty, _} -> + ok + end.