This is an automated email from the ASF dual-hosted git repository. davisp pushed a commit to branch feature/user-partitioned-databases-davisp in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f6a9317fbeed2d6c4e43f454b5be0b2ebbda42d9 Author: Paul J. Davis <[email protected]> AuthorDate: Thu Oct 25 14:27:32 2018 -0500 Optimize offset/limit for partition queries Now that a single shard handles the entire response we can optimize work normally done in the coordinator by moving it to the RPC worker which then removes the need to send an extra `skip` number of rows to the coordinator. Co-authored-by: Robert Newson <[email protected]> --- src/fabric/src/fabric_rpc.erl | 12 +++--------- src/fabric/src/fabric_view.erl | 16 ++++++++++++++++ src/fabric/src/fabric_view_all_docs.erl | 5 +++-- src/fabric/src/fabric_view_map.erl | 5 +++-- src/fabric/src/fabric_view_reduce.erl | 7 ++++--- 5 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 873e0c5..4ba13bb 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -97,9 +97,8 @@ changes(DbName, Options, StartVector, DbOptions) -> all_docs(DbName, Options, Args0) -> case fabric_util:upgrade_mrargs(Args0) of - #mrargs{keys=undefined} = Args1 -> + #mrargs{keys=undefined} = Args -> set_io_priority(DbName, Options), - Args = fix_skip_and_limit(Args1), {ok, Db} = get_or_create_db(DbName, Options), CB = get_view_cb(Args), couch_mrview:query_all_docs(Db, Args, CB, Args) @@ -123,7 +122,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> map_view(DbName, DDoc, ViewName, Args0, DbOptions); map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), CB = get_view_cb(Args), couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args). @@ -137,16 +136,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). -fix_skip_and_limit(Args) -> - #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args, - % the coordinator needs to finalize each row, so make sure the shards don't - Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}. - create_db(DbName) -> create_db(DbName, []). diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index 81eb6f0..70d6c06 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -16,6 +16,7 @@ transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). +-export([fix_skip_and_limit/1]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -372,6 +373,21 @@ get_shard_replacements(DbName, UsedShards0) -> end end, [], UsedShards). +-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}. +fix_skip_and_limit(#mrargs{} = Args) -> + {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of + undefined -> + #mrargs{skip=Skip, limit=Limit}=Args, + {Args, Args#mrargs{skip=0, limit=Skip+Limit}}; + _Partition -> + {Args#mrargs{skip=0}, Args} + end, + %% the coordinator needs to finalize each row, so make sure the shards don't + {CoordArgs, remove_finalizer(WorkerArgs)}. + +remove_finalizer(Args) -> + couch_mrview_util:set_extra(Args, finalizer, null). + % unit test is_progress_possible_test() -> EndPoint = 2 bsl 31, diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index 3aaabe4..bae11da 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -21,16 +21,17 @@ -include_lib("couch_mrview/include/couch_mrview.hrl"). go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), DbName = fabric:dbname(Db), Shards = shards(Db, QueryArgs), Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, QueryArgs]), + Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), RexiMon = fabric_util:create_monitors(Workers0), try case fabric_util:stream_start(Workers0, #shard.ref) of {ok, Workers} -> try - go(DbName, Options, Workers, QueryArgs, Callback, Acc) + go(DbName, Options, Workers, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 1648623..bc6e15d 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), Shards = fabric_view:get_shards(Db, Args), + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DocIdAndRev, View, Args, Options], + RPCArgs = [DocIdAndRev, View, WorkerArgs, Options], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs)) end, @@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go(DbName, Workers, VInfo, Args, Callback, Acc) + go(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 7acc67c..712ed24 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> DbName = fabric:dbname(Db), - DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), - RPCArgs = [DocIdAndRev, VName, Args], Shards = fabric_view:get_shards(Db, Args), + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), + DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), + RPCArgs = [DocIdAndRev, VName, WorkerArgs], fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), StartFun = fun(Shard) -> @@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go2(DbName, Workers, VInfo, Args, Callback, Acc) + go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end;
