This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch feature/user-partitioned-databases-davisp in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 0990a9c6cfc02e4a81c886442600a1e79f23706c Author: Paul J. Davis <paul.joseph.da...@gmail.com> AuthorDate: Thu Oct 25 14:27:32 2018 -0500 Optimize offset/limit for partition queries Now that a single shard handles the entire response we can optimize work normally done in the coordinator by moving it to the RPC worker which then removes the need to send an extra `skip` number of rows to the coordinator. Co-authored-by: Robert Newson <rnew...@apache.org> --- src/fabric/src/fabric_rpc.erl | 12 +++--------- src/fabric/src/fabric_view.erl | 16 ++++++++++++++++ src/fabric/src/fabric_view_all_docs.erl | 4 ++-- src/fabric/src/fabric_view_map.erl | 5 +++-- src/fabric/src/fabric_view_reduce.erl | 7 ++++--- 5 files changed, 28 insertions(+), 16 deletions(-) diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 873e0c5..4ba13bb 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -97,9 +97,8 @@ changes(DbName, Options, StartVector, DbOptions) -> all_docs(DbName, Options, Args0) -> case fabric_util:upgrade_mrargs(Args0) of - #mrargs{keys=undefined} = Args1 -> + #mrargs{keys=undefined} = Args -> set_io_priority(DbName, Options), - Args = fix_skip_and_limit(Args1), {ok, Db} = get_or_create_db(DbName, Options), CB = get_view_cb(Args), couch_mrview:query_all_docs(Db, Args, CB, Args) @@ -123,7 +122,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> map_view(DbName, DDoc, ViewName, Args0, DbOptions); map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), CB = get_view_cb(Args), couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args). @@ -137,16 +136,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). -fix_skip_and_limit(Args) -> - #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args, - % the coordinator needs to finalize each row, so make sure the shards don't - Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}. - create_db(DbName) -> create_db(DbName, []). diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 69c1e20..8117894 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -16,6 +16,7 @@ transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). +-export([fix_skip_and_limit/1]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -372,6 +373,21 @@ get_shard_replacements(DbName, UsedShards0) -> end end, [], UsedShards). +-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}. +fix_skip_and_limit(#mrargs{} = Args) -> + {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of + undefined -> + #mrargs{skip=Skip, limit=Limit}=Args, + {Args, Args#mrargs{skip=0, limit=Skip+Limit}}; + _Partition -> + {Args#mrargs{skip=0}, Args} + end, + %% the coordinator needs to finalize each row, so make sure the shards don't + {CoordArgs, remove_finalizer(WorkerArgs)}. + +remove_finalizer(Args) -> + couch_mrview_util:set_extra(Args, finalizer, null). + % unit test is_progress_possible_test() -> EndPoint = 2 bsl 31, diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index e3c6dfb..75c53de 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -24,13 +24,13 @@ go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> DbName = fabric:dbname(Db), Shards = shards(Db, QueryArgs), Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, QueryArgs]), + Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), RexiMon = fabric_util:create_monitors(Workers0), try case fabric_util:stream_start(Workers0, #shard.ref) of {ok, Workers} -> try - go(DbName, Options, Workers, QueryArgs, Callback, Acc) + go(DbName, Options, Workers, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 1648623..bc6e15d 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), Shards = fabric_view:get_shards(Db, Args), + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DocIdAndRev, View, Args, Options], + RPCArgs = [DocIdAndRev, View, WorkerArgs, Options], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs)) end, @@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go(DbName, Workers, VInfo, Args, Callback, Acc) + go(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 7acc67c..712ed24 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), - DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), - RPCArgs = [DocIdAndRev, VName, Args], Shards = fabric_view:get_shards(Db, Args), + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), + DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), + RPCArgs = [DocIdAndRev, VName, WorkerArgs], fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) -> @@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go2(DbName, Workers, VInfo, Args, Callback, Acc) + go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end;