This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch user-partitioned-dbs-6 in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 2d7e6813c567e47d54168745bafe76a6411e689b Author: Robert Newson <[email protected]> AuthorDate: Wed Sep 5 12:30:31 2018 +0100 Optimize skip for partitioned queries 'skip' is implemented efficiently at the worker level but we've disabled it for clustered views because of the multiple shards (and not being able to calculate the right skip value to pass to each worker). With a partitioned query, this problem is gone, as the value the query specifies will be the right value for all workers (as they hit the same shard range). This commit removes the old fix_skip_and_limit function from fabric_rpc and moves the logic up to the coordinators. --- src/fabric/src/fabric_rpc.erl | 12 +++--------- src/fabric/src/fabric_view.erl | 16 ++++++++++++++++ src/fabric/src/fabric_view_all_docs.erl | 5 +++-- src/fabric/src/fabric_view_map.erl | 5 +++-- src/fabric/src/fabric_view_reduce.erl | 5 +++-- 5 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 60526f4..e538a9d 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -96,9 +96,8 @@ changes(DbName, Options, StartVector, DbOptions) -> all_docs(DbName, Options, Args0) -> case fabric_util:upgrade_mrargs(Args0) of - #mrargs{keys=undefined} = Args1 -> + #mrargs{keys=undefined} = Args -> set_io_priority(DbName, Options), - Args = fix_skip_and_limit(Args1), {ok, Db} = get_or_create_db(DbName, Options), VAcc0 = #vacc{db=Db}, couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0) @@ -122,7 +121,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> map_view(DbName, DDoc, ViewName, Args0, DbOptions); map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0). @@ -136,16 +135,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> set_io_priority(DbName, DbOptions), - Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)), + Args = fabric_util:upgrade_mrargs(Args0), {ok, Db} = get_or_create_db(DbName, DbOptions), VAcc0 = #vacc{db=Db}, couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). -fix_skip_and_limit(Args) -> - #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args, - % the coordinator needs to finalize each row, so make sure the shards don't - Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}. - create_db(DbName) -> create_db(DbName, []). diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl index c0e2974..de374cd 100644 --- a/src/fabric/src/fabric_view.erl +++ b/src/fabric/src/fabric_view.erl @@ -16,6 +16,7 @@ transform_row/1, keydict/1, extract_view/4, get_shards/2, check_down_shards/2, handle_worker_exit/3, get_shard_replacements/2, maybe_update_others/5]). +-export([fix_skip_and_limit/1]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -388,6 +389,21 @@ get_shard_replacements(DbName, UsedShards0) -> end end, [], UsedShards). +-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, WorkerArgs::#mrargs{}}. +fix_skip_and_limit(#mrargs{} = Args) -> + {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, partition) of + undefined -> + #mrargs{skip=Skip, limit=Limit}=Args, + {Args, Args#mrargs{skip=0, limit=Skip+Limit}}; + _Partition -> + {Args#mrargs{skip=0}, Args} + end, + %% the coordinator needs to finalize each row, so make sure the shards don't + {CoordArgs, remove_finalizer(WorkerArgs)}. + +remove_finalizer(Args) -> + couch_mrview_util:set_extra(Args, finalizer, null). + % unit test is_progress_possible_test() -> EndPoint = 2 bsl 31, diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index b12bcde..e8868c5 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -21,15 +21,16 @@ -include_lib("couch_mrview/include/couch_mrview.hrl"). go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs), Shards = shards(DbName, QueryArgs), Workers0 = fabric_util:submit_jobs( - Shards, fabric_rpc, all_docs, [Options, QueryArgs]), + Shards, fabric_rpc, all_docs, [Options, WorkerArgs]), RexiMon = fabric_util:create_monitors(Workers0), try case fabric_util:stream_start(Workers0, #shard.ref) of {ok, Workers} -> try - go(DbName, Options, Workers, QueryArgs, Callback, Acc) + go(DbName, Options, Workers, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index b6a3d6f..85e9bec 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -25,11 +25,12 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo); go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), Shards = fabric_view:get_shards(DbName, Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), - RPCArgs = [DocIdAndRev, View, Args, Options], + RPCArgs = [DocIdAndRev, View, WorkerArgs, Options], StartFun = fun(Shard) -> hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs)) end, @@ -41,7 +42,7 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go(DbName, Workers, VInfo, Args, Callback, Acc) + go(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end; diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index a74be10..1ea4d1b 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -24,8 +24,9 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) - go(DbName, DDoc, View, Args, Callback, Acc0, VInfo); go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> + {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), - RPCArgs = [DocIdAndRev, VName, Args], + RPCArgs = [DocIdAndRev, VName, WorkerArgs], Shards = fabric_view:get_shards(DbName, Args), fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), Repls = fabric_view:get_shard_replacements(DbName, Shards), @@ -40,7 +41,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) -> Callback({error, ddoc_updated}, Acc); {ok, Workers} -> try - go2(DbName, Workers, VInfo, Args, Callback, Acc) + go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc) after fabric_util:cleanup(Workers) end;
