This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch feature/user-partitioned-databases-davisp
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f6a9317fbeed2d6c4e43f454b5be0b2ebbda42d9
Author: Paul J. Davis <[email protected]>
AuthorDate: Thu Oct 25 14:27:32 2018 -0500

    Optimize offset/limit for partition queries
    
    Now that a single shard handles the entire response we can optimize work
    normally done in the coordinator by moving it to the RPC worker which
    then removes the need to send an extra `skip` number of rows to the
    coordinator.
    
    Co-authored-by: Robert Newson <[email protected]>
---
 src/fabric/src/fabric_rpc.erl           | 12 +++---------
 src/fabric/src/fabric_view.erl          | 16 ++++++++++++++++
 src/fabric/src/fabric_view_all_docs.erl |  5 +++--
 src/fabric/src/fabric_view_map.erl      |  5 +++--
 src/fabric/src/fabric_view_reduce.erl   |  7 ++++---
 5 files changed, 29 insertions(+), 16 deletions(-)

diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 873e0c5..4ba13bb 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -97,9 +97,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
 
 all_docs(DbName, Options, Args0) ->
     case fabric_util:upgrade_mrargs(Args0) of
-        #mrargs{keys=undefined} = Args1 ->
+        #mrargs{keys=undefined} = Args ->
             set_io_priority(DbName, Options),
-            Args = fix_skip_and_limit(Args1),
             {ok, Db} = get_or_create_db(DbName, Options),
             CB = get_view_cb(Args),
             couch_mrview:query_all_docs(Db, Args, CB, Args)
@@ -123,7 +122,7 @@ map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) 
->
     map_view(DbName, DDoc, ViewName, Args0, DbOptions);
 map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     CB = get_view_cb(Args),
     couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args).
@@ -137,16 +136,11 @@ reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, 
DbOptions) ->
     reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
 reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
-    Args = fix_skip_and_limit(fabric_util:upgrade_mrargs(Args0)),
+    Args = fabric_util:upgrade_mrargs(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
-fix_skip_and_limit(Args) ->
-    #mrargs{skip=Skip, limit=Limit, extra=Extra}=Args,
-    % the coordinator needs to finalize each row, so make sure the shards don't
-    Args#mrargs{skip=0, limit=Skip+Limit, extra=[{finalizer,null} | Extra]}.
-
 create_db(DbName) ->
     create_db(DbName, []).
 
diff --git a/src/fabric/src/fabric_view.erl b/src/fabric/src/fabric_view.erl
index 81eb6f0..70d6c06 100644
--- a/src/fabric/src/fabric_view.erl
+++ b/src/fabric/src/fabric_view.erl
@@ -16,6 +16,7 @@
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
     check_down_shards/2, handle_worker_exit/3,
     get_shard_replacements/2, maybe_update_others/5]).
+-export([fix_skip_and_limit/1]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -372,6 +373,21 @@ get_shard_replacements(DbName, UsedShards0) ->
         end
     end, [], UsedShards).
 
+-spec fix_skip_and_limit(#mrargs{}) -> {CoordArgs::#mrargs{}, 
WorkerArgs::#mrargs{}}.
+fix_skip_and_limit(#mrargs{} = Args) ->
+    {CoordArgs, WorkerArgs} = case couch_mrview_util:get_extra(Args, 
partition) of
+        undefined ->
+            #mrargs{skip=Skip, limit=Limit}=Args,
+            {Args, Args#mrargs{skip=0, limit=Skip+Limit}};
+        _Partition ->
+            {Args#mrargs{skip=0}, Args}
+    end,
+    %% the coordinator needs to finalize each row, so make sure the shards 
don't
+    {CoordArgs, remove_finalizer(WorkerArgs)}.
+
+remove_finalizer(Args) ->
+    couch_mrview_util:set_extra(Args, finalizer, null).
+
 % unit test
 is_progress_possible_test() ->
     EndPoint = 2 bsl 31,
diff --git a/src/fabric/src/fabric_view_all_docs.erl 
b/src/fabric/src/fabric_view_all_docs.erl
index 3aaabe4..bae11da 100644
--- a/src/fabric/src/fabric_view_all_docs.erl
+++ b/src/fabric/src/fabric_view_all_docs.erl
@@ -21,16 +21,17 @@
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
 go(Db, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
     DbName = fabric:dbname(Db),
     Shards = shards(Db, QueryArgs),
     Workers0 = fabric_util:submit_jobs(
-            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+            Shards, fabric_rpc, all_docs, [Options, WorkerArgs]),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
         case fabric_util:stream_start(Workers0, #shard.ref) of
             {ok, Workers} ->
                 try
-                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                    go(DbName, Options, Workers, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_map.erl 
b/src/fabric/src/fabric_view_map.erl
index 1648623..bc6e15d 100644
--- a/src/fabric/src/fabric_view_map.erl
+++ b/src/fabric/src/fabric_view_map.erl
@@ -27,10 +27,11 @@ go(DbName, Options, GroupId, View, Args, Callback, Acc, 
VInfo)
 go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
     Shards = fabric_view:get_shards(Db, Args),
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
     DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, View, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DocIdAndRev, View, Args, Options],
+    RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,
@@ -42,7 +43,7 @@ go(Db, Options, DDoc, View, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;
diff --git a/src/fabric/src/fabric_view_reduce.erl 
b/src/fabric/src/fabric_view_reduce.erl
index 7acc67c..712ed24 100644
--- a/src/fabric/src/fabric_view_reduce.erl
+++ b/src/fabric/src/fabric_view_reduce.erl
@@ -25,9 +25,10 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when 
is_binary(GroupId) -
 
 go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
     DbName = fabric:dbname(Db),
-    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
-    RPCArgs = [DocIdAndRev, VName, Args],
     Shards = fabric_view:get_shards(Db, Args),
+    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
+    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
+    RPCArgs = [DocIdAndRev, VName, WorkerArgs],
     fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->
@@ -41,7 +42,7 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
                 Callback({error, ddoc_updated}, Acc);
             {ok, Workers} ->
                 try
-                    go2(DbName, Workers, VInfo, Args, Callback, Acc)
+                    go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
                 after
                     fabric_util:cleanup(Workers)
                 end;

Reply via email to