This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 27e8e332cf974bff69b86a8d3c5e2f5a6de4125f
Author: Paul J. Davis <[email protected]>
AuthorDate: Fri Nov 13 14:11:00 2020 -0600

    Minimize conflicts while building views
    
    This flips the view indexer to grab the database update_seq outside of
    the update transaction. Previously we would cosntantly refresh the
    db_seq value on every retry of the transactional loop.
    
    We use a snapshot to get the update_seq so that we don't trigger
    spurious read conflicts with any clients that might be updating the
    database.
---
 src/couch_views/src/couch_views_indexer.erl       | 54 +++++++++++++++++++----
 src/couch_views/test/couch_views_indexer_test.erl | 45 +++++++++++++++++++
 2 files changed, 91 insertions(+), 8 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl 
b/src/couch_views/src/couch_views_indexer.erl
index 2735f66..83ccb2c 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -86,15 +86,22 @@ init() ->
         fail_job(Job, Data, sig_changed, "Design document was modified")
     end,
 
+    DbSeq = fabric2_fdb:transactional(Db, fun(TxDb) ->
+        fabric2_fdb:with_snapshot(TxDb, fun(SSDb) ->
+            fabric2_db:get_update_seq(SSDb)
+        end)
+    end),
+
     State = #{
         tx_db => undefined,
         db_uuid => DbUUID,
-        db_seq => undefined,
+        db_seq => DbSeq,
         view_seq => undefined,
         last_seq => undefined,
         view_vs => undefined,
         job => Job,
         job_data => Data,
+        rows_processed => 0,
         count => 0,
         changes_done => 0,
         doc_acc => [],
@@ -206,8 +213,6 @@ do_update(Db, Mrst0, State0) ->
         {ok, State2} = fold_changes(State1),
 
         #{
-            count := Count,
-            limit := Limit,
             doc_acc := DocAcc,
             last_seq := LastSeq,
             view_vs := ViewVS,
@@ -228,7 +233,7 @@ do_update(Db, Mrst0, State0) ->
             total_kvs => TotalKVs
         },
 
-        case Count < Limit of
+        case is_update_finished(State2) of
             true ->
                 maybe_set_build_status(TxDb, Mrst2, ViewVS,
                     ?INDEX_READY),
@@ -249,6 +254,20 @@ do_update(Db, Mrst0, State0) ->
     end).
 
 
+is_update_finished(State) ->
+    #{
+        db_seq := DbSeq,
+        last_seq := LastSeq,
+        view_vs := ViewVs
+    } = State,
+    AtDbSeq = LastSeq == DbSeq,
+    AtViewVs = case ViewVs of
+        not_found -> false;
+        _ -> LastSeq == fabric2_fdb:vs_to_seq(ViewVs)
+    end,
+    AtDbSeq orelse AtViewVs.
+
+
 maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
     ok;
 
@@ -258,7 +277,7 @@ maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) ->
 
 % In the first iteration of update we need
 % to populate our db and view sequences
-get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = State) ->
+get_update_start_state(TxDb, Mrst, #{view_seq := undefined} = State) ->
     #{
         view_vs := ViewVS,
         view_seq := ViewSeq
@@ -266,7 +285,6 @@ get_update_start_state(TxDb, Mrst, #{db_seq := undefined} = 
State) ->
 
     State#{
         tx_db := TxDb,
-        db_seq := fabric2_db:get_update_seq(TxDb),
         view_vs := ViewVS,
         view_seq := ViewSeq,
         last_seq := ViewSeq
@@ -281,18 +299,36 @@ get_update_start_state(TxDb, _Idx, State) ->
 fold_changes(State) ->
     #{
         view_seq := SinceSeq,
+        db_seq := DbSeq,
         limit := Limit,
         tx_db := TxDb
     } = State,
 
+    FoldState = State#{
+        rows_processed := 0
+    },
+
     Fun = fun process_changes/2,
-    Opts = [{limit, Limit}, {restart_tx, false}],
-    fabric2_db:fold_changes(TxDb, SinceSeq, Fun, State, Opts).
+    Opts = [
+        {end_key, fabric2_fdb:seq_to_vs(DbSeq)},
+        {limit, Limit},
+        {restart_tx, false}
+    ],
+    case fabric2_db:fold_changes(TxDb, SinceSeq, Fun, FoldState, Opts) of
+        {ok, #{rows_processed := 0} = FinalState} when Limit > 0 ->
+            % If we read zero rows with a non-zero limit
+            % it means we've caught up to the DbSeq as our
+            % last_seq.
+            {ok, FinalState#{last_seq := DbSeq}};
+        Result ->
+            Result
+    end.
 
 
 process_changes(Change, Acc) ->
     #{
         doc_acc := DocAcc,
+        rows_processed := RowsProcessed,
         count := Count,
         design_opts := DesignOpts,
         view_vs := ViewVS
@@ -308,12 +344,14 @@ process_changes(Change, Acc) ->
     Acc1 = case {Id, IncludeDesign} of
         {<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
             maps:merge(Acc, #{
+                rows_processed => RowsProcessed + 1,
                 count => Count + 1,
                 last_seq => LastSeq
             });
         _ ->
             Acc#{
                 doc_acc := DocAcc ++ [Change],
+                rows_processed := RowsProcessed + 1,
                 count := Count + 1,
                 last_seq := LastSeq
             }
diff --git a/src/couch_views/test/couch_views_indexer_test.erl 
b/src/couch_views/test/couch_views_indexer_test.erl
index 75be245..a0890da 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -52,6 +52,7 @@ indexer_test_() ->
                     ?TDEF_FE(index_autoupdater_callback),
                     ?TDEF_FE(handle_db_recreated_when_running),
                     ?TDEF_FE(handle_db_recreated_after_finished),
+                    ?TDEF_FE(handle_doc_updated_when_running),
                     ?TDEF_FE(index_can_recover_from_crash, 60)
                 ]
             }
@@ -504,6 +505,50 @@ handle_db_recreated_after_finished(Db) ->
     ], Out2).
 
 
+handle_doc_updated_when_running(Db) ->
+    DDoc = create_ddoc(),
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+    {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+    % To intercept job building while it is running ensure updates happen one
+    % row at a time.
+    config:set("couch_views", "batch_initial_size", "1", false),
+
+    meck_intercept_job_update(self()),
+
+    [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]),
+
+    {Indexer, _Job, _Data} = wait_indexer_update(10000),
+
+    {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId),
+    ?assertEqual(running, State),
+
+    {ok, SubId, running, _} = couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId),
+
+    {ok, Doc} = fabric2_db:open_doc(Db, <<"1">>),
+    Doc2 = Doc#doc {
+        body = {[{<<"val">>, 2}]}
+    },
+    {ok, _} = fabric2_db:update_doc(Db, Doc2),
+
+    reset_intercept_job_update(Indexer),
+    Indexer ! continue,
+
+    ?assertMatch({
+        ?INDEX_JOB_TYPE,
+        JobId,
+        finished,
+        #{<<"active_task_info">> := #{<<"changes_done">> := 1}}
+    }, couch_jobs:wait(SubId, finished, infinity)),
+
+    Args = #mrargs{update = false},
+    {ok, Out2} = couch_views:query(Db, DDoc, ?MAP_FUN1, fun fold_fun/2, [], 
Args),
+    ?assertEqual([
+        row(<<"0">>, 0, 0)
+    ], Out2).
+
+
 index_can_recover_from_crash(Db) ->
     ok = meck:new(config, [passthrough]),
     ok = meck:expect(config, get_integer, fun(Section, Key, Default) ->

Reply via email to