This is an automated email from the ASF dual-hosted git repository.
rnewson pushed a commit to branch user-partitioned-dbs-6
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/user-partitioned-dbs-6 by this
push:
new ce3593e Optimize skip for partitioned queries
ce3593e is described below
commit ce3593e70c1963ab3429e9481e95708a4ac38f08
Author: Robert Newson <[email protected]>
AuthorDate: Wed Sep 5 12:30:31 2018 +0100
Optimize skip for partitioned queries
'skip' is implemented efficiently at the worker level but we've
disabled it for clustered views because of the multiple shards (and
not being able to calculate the right skip value to pass to each
worker). With a partitioned query, this problem is gone, as the value
the query specifies will be the right value for all workers (as they
hit the same shard range).
This commit removes the old fix_skip_and_limit function from
fabric_rpc and moves the logic up to the coordinators.
---
src/fabric/src/fabric_rpc.erl | 12 +++---------
src/fabric/src/fabric_view.erl | 12 ++++++++++++
src/fabric/src/fabric_view_all_docs.erl | 5 +++--
src/fabric/src/fabric_view_map.erl | 5 +++--
src/fabric/src/fabric_view_reduce.erl | 5 +++--
5 files changed, 24 insertions(+), 15 deletions(-)
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 60526f4..e538a9d 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -96,9 +96,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
all_docs(DbName, Options, Args0) ->
case fabric_util:upgrade_mrargs(Args0) of
- #mrargs{keys=undefined} = Args1 ->
+ #mrargs{keys=undefined} = Args ->
set_io_priority(DbName, Options),
- Args = fix_skip_and_limit(Args1),
{ok, Db} = get_or_create_db(DbName, Options),
VAcc0 = #vacc{db=Db},
couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0)
@@ -122,7 +121,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions)
->
map_view(DbName, DDoc, ViewName, Args0, DbOptions);
map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
- Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+ Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db=Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
@@ -136,16 +135,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0,
DbOptions) ->
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
set_io_priority(DbName, DbOptions),
- Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+ Args = fabric_util:upgrade_mrargs(Args0),
{ok, Db} = get_or_create_db(DbName, DbOptions),
VAcc0 = #vacc{db=Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
-fix_skip_and_limit(Args) ->
- #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
- % the coordinator needs to finalize each row, so make sure the shards don't
- Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
create_db(DbName) ->
create_db(DbName, []).
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index c0e2974..538da72 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
transform_row/1, keydict/1, extract_view/4, get_shards/2,
check_down_shards/2, handle_worker_exit/3,
get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
@@ -388,6 +389,17 @@ get_shard_replacements(DbName, UsedShards0) ->
end
end, [], UsedShards).
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{},
WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+ case couch_mrview_util:get_extra(Args, partitioned) of
+ true ->
+ {Args#mrargs{skip=0}, Args};
+ false ->
+ #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
+ %% the coordinator needs to finalize each row, so make sure the
shards don't
+ {Args, Args#mrargs{skip=0, limit=Skip+Limit,
extra=[{finalizer,null} | Extra]}}
+ end.
+
% unit test
is_progress_possible_test() ->
EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl
b/src/fabric/src/fabric_view_all_docs.erl
index b12bcde..e8868c5 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -21,15 +21,16 @@
-include_lib("couch_mrview/include/couch_mrview.hrl").
go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
Shards = shards(DbName, QueryArgs),
Workers0 = fabric_util:submit_jobs(
- Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+ Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
RexiMon = fabric_util:create_monitors(Workers0),
try
case fabric_util:stream_start(Workers0, #shard.ref) of
{ok, Workers} ->
try
- go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+ go(DbName, Options, Workers, CoordArgs, Callback, Acc)
after
fabric_util:cleanup(Workers)
end;
diff --git a/src/fabric/src/fabric_view_map.erl
b/src/fabric/src/fabric_view_map.erl
index b6a3d6f..85e9bec 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -25,11 +25,12 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc,
VInfo)
go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo);
go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
Shards = fabric_view:get_shards(DbName, Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
- RPCArgs = [DocIdAndRev, View, Args, Options],
+ RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
end,
@@ -41,7 +42,7 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
- go(DbName, Workers, VInfo, Args, Callback, Acc)
+ go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_util:cleanup(Workers)
end;
diff --git a/src/fabric/src/fabric_view_reduce.erl
b/src/fabric/src/fabric_view_reduce.erl
index a74be10..1ea4d1b 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -24,8 +24,9 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when
is_binary(GroupId) -
go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
+ {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
- RPCArgs = [DocIdAndRev, VName, Args],
+ RPCArgs = [DocIdAndRev, VName, WorkerArgs],
Shards = fabric_view:get_shards(DbName, Args),
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
Repls = fabric_view:get_shard_replacements(DbName, Shards),
@@ -40,7 +41,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
Callback({error, ddoc_updated}, Acc);
{ok, Workers} ->
try
- go2(DbName, Workers, VInfo, Args, Callback, Acc)
+ go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
after
fabric_util:cleanup(Workers)
end;