This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch user-partitioned-dbs-6
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 2d7e6813c567e47d54168745bafe76a6411e689b
Author: Robert Newson <[email protected]>
AuthorDate: Wed Sep 5 12:30:31 2018 +0100

    Optimize skip for partitioned queries
    
    'skip' is implemented efficiently at the worker level but we've
    disabled it for clustered views because of the multiple shards (and
    not being able to calculate the right skip value to pass to each
    worker). With a partitioned query, this problem is gone, as the value
    the query specifies will be the right value for all workers (as they
    hit the same shard range).
    
    This commit removes the old fix_skip_and_limit function from
    fabric_rpc and moves the logic up to the coordinators.
---
 src/fabric/src/fabric_rpc.erl           | 12 +++---------
 src/fabric/src/fabric_view.erl          | 16 ++++++++++++++++
 src/fabric/src/fabric_view_all_docs.erl |  5 +++--
 src/fabric/src/fabric_view_map.erl      |  5 +++--
 src/fabric/src/fabric_view_reduce.erl   |  5 +++--
 5 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 60526f4..e538a9d 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -96,9 +96,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
 
 all_docs(DbName, Options, Args0) ->
     case fabric_util:upgrade_mrargs(Args0) of
-        #mrargs{keys=undefined} = Args1 ->
+        #mrargs{keys=undefined} = Args ->
             set_io_priority(DbName, Options),
-            Args = fix_skip_and_limit(Args1),
             {ok, Db} = get_or_create_db(DbName, Options),
             VAcc0 = #vacc{db=Db},
             couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0)
@@ -122,7 +121,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) 
->
     map_view(DbName, DDoc, ViewName, Args0, DbOptions);
 map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
@@ -136,16 +135,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, 
DbOptions) ->
     reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
 reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
-fix_skip_and_limit(Args) ->
-    #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
-    % the coordinator needs to finalize each row, so make sure the shards don't
-    Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
 create_db(DbName) ->
     create_db(DbName, []).
 
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index c0e2974..de374cd 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
     check_down_shards/2, handle_worker_exit/3,
     get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -388,6 +389,21 @@ get_shard_replacements(DbName, UsedShards0) ->
         end
     end, [], UsedShards).
 
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, 
WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+    {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, 
partition) of
+        undefined ->
+            #mrargs{skip=Skip, limit=Limit}=Args,
+            {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
+        _Partition ->
+            {Args#mrargs{skip=0}, Args}
+    end,
+    %% the coordinator needs to finalize each row, so make sure the shards 
don't
+    {CoordArgs, remove_finalizer(WorkerArgs)}.
+
+remove_finalizer(Args) ->
+    couch_mrview_util:set_extra(Args, finalizer, null).
+
 % unit test
 is_progress_possible_test() ->
     EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl 
b/src/fabric/src/fabric_view_all_docs.erl
index b12bcde..e8868c5 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -21,15 +21,16 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
     Shards = shards(DbName, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
-            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+            Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
         case fabric_util:stream_start(Workers0, #shard.ref) of
             {ok, Workers} ->
                 try
-                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                    go(DbName, Options, Workers, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_map.erl 
b/src/fabric/src/fabric_view_map.erl
index b6a3d6f..85e9bec 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -25,11 +25,12 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, 
VInfo)
     go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo);
 
 go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     Shards = fabric_view:get_shards(DbName, Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DocIdAndRev, View, Args, Options],
+    RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,
@@ -41,7 +42,7 @@ go(DbName, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_reduce.erl 
b/src/fabric/src/fabric_view_reduce.erl
index a74be10..1ea4d1b 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -24,8 +24,9 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when 
is_binary(GroupId) -
     go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
 
 go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
-    RPCArgs = [DocIdAndRev, VName, Args],
+    RPCArgs = [DocIdAndRev, VName, WorkerArgs],
     Shards = fabric_view:get_shards(DbName, Args),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
@@ -40,7 +41,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go2(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;

Reply via email to