This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 0990a9c6cfc02e4a81c886442600a1e79f23706c
Author: Paul J. Davis <paul.joseph.da...@gmail.com>
AuthorDate: Thu Oct 25 14:27:32 2018 -0500

    Optimize offset/limit for partition queries
    
    Now that a single shard handles the entire response we can optimize work
    normally done in the coordinator by moving it to the RPC worker which
    then removes the need to send an extra `skip` number of rows to the
    coordinator.
    
    Co-authored-by: Robert Newson <rnew...@apache.org>
---
 src/fabric/src/fabric_rpc.erl           | 12 +++---------
 src/fabric/src/fabric_view.erl          | 16 ++++++++++++++++
 src/fabric/src/fabric_view_all_docs.erl |  4 ++--
 src/fabric/src/fabric_view_map.erl      |  5 +++--
 src/fabric/src/fabric_view_reduce.erl   |  7 ++++---
 5 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 873e0c5..4ba13bb 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -97,9 +97,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
 
 all_docs(DbName, Options, Args0) ->
     case fabric_util:upgrade_mrargs(Args0) of
-        #mrargs{keys=undefined} = Args1 ->
+        #mrargs{keys=undefined} = Args ->
             set_io_priority(DbName, Options),
-            Args = fix_skip_and_limit(Args1),
             {ok, Db} = get_or_create_db(DbName, Options),
             CB = get_view_cb(Args),
             couch_mrview:query_all_docs(Db, Args, CB, Args)
@@ -123,7 +122,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) 
->
     map_view(DbName, DDoc, ViewName, Args0, DbOptions);
 map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     CB = get_view_cb(Args),
     couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
@@ -137,16 +136,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, 
DbOptions) ->
     reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
 reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
-fix_skip_and_limit(Args) ->
-    #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
-    % the coordinator needs to finalize each row, so make sure the shards don't
-    Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
 create_db(DbName) ->
     create_db(DbName, []).
 
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 69c1e20..8117894 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
     check_down_shards/2, handle_worker_exit/3,
     get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -372,6 +373,21 @@ get_shard_replacements(DbName, UsedShards0) ->
         end
     end, [], UsedShards).
 
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, 
WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+    {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, 
partition) of
+        undefined ->
+            #mrargs{skip=Skip, limit=Limit}=Args,
+            {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
+        _Partition ->
+            {Args#mrargs{skip=0}, Args}
+    end,
+    %% the coordinator needs to finalize each row, so make sure the shards 
don't
+    {CoordArgs, remove_finalizer(WorkerArgs)}.
+
+remove_finalizer(Args) ->
+    couch_mrview_util:set_extra(Args, finalizer, null).
+
 % unit test
 is_progress_possible_test() ->
     EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl 
b/src/fabric/src/fabric_view_all_docs.erl
index e3c6dfb..75c53de 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -24,13 +24,13 @@ go(Db, Options, #mrargs{keys=undefined} = QueryArgs, 
Callback, Acc) ->
     DbName = fabric:dbname(Db),
     Shards = shards(Db, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
-            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+            Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
         case fabric_util:stream_start(Workers0, #shard.ref) of
             {ok, Workers} ->
                 try
-                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                    go(DbName, Options, Workers, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_map.erl 
b/src/fabric/src/fabric_view_map.erl
index 1648623..bc6e15d 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, 
VInfo)
 go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
     Shards = fabric_view:get_shards(Db, Args),
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DocIdAndRev, View, Args, Options],
+    RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,
@@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_reduce.erl 
b/src/fabric/src/fabric_view_reduce.erl
index 7acc67c..712ed24 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when 
is_binary(GroupId) -
 
 go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
-    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
-    RPCArgs = [DocIdAndRev, VName, Args],
     Shards = fabric_view:get_shards(Db, Args),
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
+    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
+    RPCArgs = [DocIdAndRev, VName, WorkerArgs],
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->
@@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go2(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;

Reply via email to