This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch check-db-instance-in-indices
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 9ae55e77ed63573e720cf9e9c48bedea3718ebd7
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Mon Mar 23 16:19:16 2020 -0400

    Handle db re-creation in view indexing
    
    Add the db instance id to indexing job data. During indexing ensure the
    database is opened with the `{uuid, DbUUID}` option. After that any stale db
    reads in `update/3` will throw the `database_does_not_exist` error.
    
    In addition, when the indexing job is re-submitted in `build_view_async/2`,
    check if it contains a reference to an old db instance id and replace the 
job.
    That has to happen since couch_jobs doesn't overwrite job data for running
    jobs.
---
 src/couch_views/src/couch_views_indexer.erl       |  51 ++++++-----
 src/couch_views/src/couch_views_jobs.erl          |  15 ++-
 src/couch_views/test/couch_views_indexer_test.erl | 107 +++++++++++++++++++++-
 3 files changed, 146 insertions(+), 27 deletions(-)

diff --git a/src/couch_views/src/couch_views_indexer.erl 
b/src/couch_views/src/couch_views_indexer.erl
index 04dbcf8..802b13f 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -46,45 +46,34 @@ init() ->
     Data = upgrade_data(Data0),
     #{
         <<"db_name">> := DbName,
+        <<"db_uuid">> := DbUUID,
         <<"ddoc_id">> := DDocId,
         <<"sig">> := JobSig,
         <<"retries">> := Retries
     } = Data,
-
     {ok, Db} = try
-        fabric2_db:open(DbName, [?ADMIN_CTX])
+        fabric2_db:open(DbName, [?ADMIN_CTX, {uuid, DbUUID}])
     catch error:database_does_not_exist ->
-        couch_jobs:finish(undefined, Job, Data#{
-            error => db_deleted,
-            reason => "Database was deleted"
-        }),
-        exit(normal)
+        fail_job(Job, Data, db_deleted, "Database was deleted")
     end,
 
     {ok, DDoc} = case fabric2_db:open_doc(Db, DDocId) of
         {ok, DDoc0} ->
             {ok, DDoc0};
         {not_found, _} ->
-            couch_jobs:finish(undefined, Job, Data#{
-                error => ddoc_deleted,
-                reason => "Design document was deleted"
-            }),
-            exit(normal)
+            fail_job(Job, Data, ddoc_deleted, "Design document was deleted")
     end,
 
     {ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
     HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
 
-    if  HexSig == JobSig -> ok; true ->
-        couch_jobs:finish(undefined, Job, Data#{
-            error => sig_changed,
-            reason => <<"Design document was modified">>
-        }),
-        exit(normal)
+    if HexSig == JobSig -> ok; true ->
+        fail_job(Job, Data, sig_changed, "Design document was modified")
     end,
 
     State = #{
         tx_db => undefined,
+        db_uuid => DbUUID,
         db_seq => undefined,
         view_seq => undefined,
         last_seq => undefined,
@@ -101,6 +90,8 @@ init() ->
     catch
         exit:normal ->
             ok;
+        error:database_does_not_exist ->
+            fail_job(Job, Data, db_deleted, "Database was deleted");
         Error:Reason  ->
             NewRetry = Retries + 1,
             RetryLimit = retry_limit(),
@@ -115,17 +106,19 @@ init() ->
                     StateErr = State#{job_data := DataErr, last_seq := 
<<"0">>},
                     report_progress(StateErr, update);
                 false ->
-                    NewData = add_error(Error, Reason, Data),
-                    couch_jobs:finish(undefined, Job, NewData),
-                    exit(normal)
+                    fail_job(Job, Data, Error, Reason)
             end
     end.
 
 
-upgrade_data(Data) ->
-    case maps:is_key(<<"retries">>, Data) of
-        true -> Data;
-        false -> Data#{<<"retries">> =>0}
+upgrade_data(Data0) ->
+    Data1 = case maps:is_key(<<"retries">>, Data0) of
+        true -> Data0;
+        false -> Data0#{<<"retries">> => 0}
+    end,
+    case maps:is_key(<<"db_uuid">>, Data1) of
+        true -> Data1;
+        false -> Data1#{<<"db_uuid">> => undefined}
     end.
 
 
@@ -433,6 +426,7 @@ report_progress(State, UpdateType) ->
 
     #{
         <<"db_name">> := DbName,
+        <<"db_uuid">> := DbUUID,
         <<"ddoc_id">> := DDocId,
         <<"sig">> := Sig,
         <<"retries">> := Retries
@@ -442,6 +436,7 @@ report_progress(State, UpdateType) ->
     % possible existing error state.
     NewData = #{
         <<"db_name">> => DbName,
+        <<"db_uuid">> => DbUUID,
         <<"ddoc_id">> => DDocId,
         <<"sig">> => Sig,
         <<"view_seq">> => LastSeq,
@@ -468,6 +463,12 @@ report_progress(State, UpdateType) ->
     end.
 
 
+fail_job(Job, Data, Error, Reason) ->
+    NewData = add_error(Error, Reason, Data),
+    couch_jobs:finish(undefined, Job, NewData),
+    exit(normal).
+
+
 num_changes() ->
     config:get_integer("couch_views", "change_limit", 100).
 
diff --git a/src/couch_views/src/couch_views_jobs.erl 
b/src/couch_views/src/couch_views_jobs.erl
index 937146c..fcfa19c 100644
--- a/src/couch_views/src/couch_views_jobs.erl
+++ b/src/couch_views/src/couch_views_jobs.erl
@@ -43,7 +43,19 @@ build_view(TxDb, Mrst, UpdateSeq) ->
 build_view_async(TxDb, Mrst) ->
     JobId = job_id(TxDb, Mrst),
     JobData = job_data(TxDb, Mrst),
-    ok = couch_jobs:add(undefined, ?INDEX_JOB_TYPE, JobId, JobData),
+    DbUUID = fabric2_db:get_uuid(TxDb),
+    couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) ->
+        case couch_jobs:get_job_data(JTx, ?INDEX_JOB_TYPE, JobId) of
+            {error, not_found} ->
+                ok;
+            {ok, #{} = OldJobData} ->
+                case maps:get(<<"db_uuid">>, OldJobData) of
+                    DbUUID -> ok;
+                    _ -> couch_jobs:remove(JTx, ?INDEX_JOB_TYPE, JobId)
+                end
+        end,
+        ok = couch_jobs:add(JTx, ?INDEX_JOB_TYPE, JobId, JobData)
+    end),
     {ok, JobId}.
 
 
@@ -95,6 +107,7 @@ job_data(Db, Mrst) ->
 
     #{
         db_name => fabric2_db:name(Db),
+        db_uuid => fabric2_db:get_uuid(Db),
         ddoc_id => DDocId,
         sig => fabric2_util:to_hex(Sig),
         retries => 0
diff --git a/src/couch_views/test/couch_views_indexer_test.erl 
b/src/couch_views/test/couch_views_indexer_test.erl
index 5475cf6..73cbe32 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -16,6 +16,7 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("couch_views/include/couch_views.hrl").
 -include_lib("fabric/test/fabric2_test.hrl").
 
 
@@ -47,7 +48,9 @@ indexer_test_() ->
                     ?TDEF_FE(fewer_multipe_identical_keys_from_same_doc),
                     ?TDEF_FE(handle_size_key_limits),
                     ?TDEF_FE(handle_size_value_limits),
-                    ?TDEF_FE(index_autoupdater_callback)
+                    ?TDEF_FE(index_autoupdater_callback),
+                    ?TDEF_FE(handle_db_recreated_when_running),
+                    ?TDEF_FE(handle_db_recreated_when_finished)
                 ]
             }
         }
@@ -75,6 +78,7 @@ foreach_setup() ->
 
 foreach_teardown(Db) ->
     meck:unload(),
+    config:delete("couch_views", "change_limit"),
     ok = fabric2_db:delete(fabric2_db:name(Db), []).
 
 
@@ -372,6 +376,83 @@ index_autoupdater_callback(Db) ->
     ?assertEqual(ok, couch_views_jobs:wait_for_job(JobId, DbSeq)).
 
 
+handle_db_recreated_when_running(Db) ->
+    DbName = fabric2_db:name(Db),
+
+    DDoc = create_ddoc(),
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+    {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+    % To intercept job building while it is running ensure updates happen one
+    % row at a time.
+    config:set("couch_views", "change_limit", "1", false),
+
+    meck_intercept_job_update(self()),
+
+    % Start async index building, reuse fabric2_indexer's callback since it
+    % convinently returns job ids
+    [{ok, JobId}] = couch_views:build_indices(Db, [DDoc]),
+
+    {ok, SubId, _, _} = couch_jobs:subscribe(?INDEX_JOB_TYPE, JobId),
+
+    {Indexer, _Job, _Data} = wait_indexer_update(10000),
+
+    {ok, State} = couch_jobs:get_job_state(undefined, ?INDEX_JOB_TYPE, JobId),
+    ?assertEqual(running, State),
+
+    ok = fabric2_db:delete(DbName, []),
+    {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+
+    {ok, _} = fabric2_db:update_doc(Db1, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db1, doc(2), []),
+    {ok, _} = fabric2_db:update_doc(Db1, doc(3), []),
+
+    reset_intercept_job_update(Indexer),
+
+    {?INDEX_JOB_TYPE, _, _, JobData} = couch_jobs:wait(SubId, finished, 5000),
+    ?assertMatch(#{<<"error">> := <<"db_deleted">>}, JobData),
+
+    {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1),
+    ?assertEqual([
+        row(<<"2">>, 2, 2),
+        row(<<"3">>, 3, 3)
+    ], Out2).
+
+
+handle_db_recreated_when_finished(Db) ->
+    DbName = fabric2_db:name(Db),
+
+    DDoc = create_ddoc(),
+    {ok, _} = fabric2_db:update_doc(Db, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db, doc(0), []),
+    {ok, _} = fabric2_db:update_doc(Db, doc(1), []),
+
+    {ok, Out1} = run_query(Db, DDoc, ?MAP_FUN1),
+    ?assertEqual([
+        row(<<"0">>, 0, 0),
+        row(<<"1">>, 1, 1)
+    ], Out1),
+
+    ok = fabric2_db:delete(DbName, []),
+
+    ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)),
+
+    {ok, Db1} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+
+    {ok, _} = fabric2_db:update_doc(Db1, DDoc, []),
+    {ok, _} = fabric2_db:update_doc(Db1, doc(2), []),
+    {ok, _} = fabric2_db:update_doc(Db1, doc(3), []),
+
+    ?assertError(database_does_not_exist, run_query(Db, DDoc, ?MAP_FUN1)),
+
+    {ok, Out2} = run_query(Db1, DDoc, ?MAP_FUN1),
+    ?assertEqual([
+        row(<<"2">>, 2, 2),
+        row(<<"3">>, 3, 3)
+    ], Out2).
+
+
 row(Id, Key, Value) ->
     {row, [
         {id, Id},
@@ -480,3 +561,27 @@ doc(Id, Val) ->
 
 run_query(#{} = Db, DDoc, <<_/binary>> = View) ->
     couch_views:query(Db, DDoc, View, fun fold_fun/2, [], #mrargs{}).
+
+
+meck_intercept_job_update(ParentPid) ->
+    meck:new(couch_jobs, [passthrough]),
+    meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
+        ParentPid ! {self(), Job, Data},
+        receive continue -> ok end,
+        meck:passthrough([Db, Job, Data])
+    end).
+
+
+reset_intercept_job_update(IndexerPid) ->
+    meck:expect(couch_jobs, update, fun(Db, Job, Data) ->
+        meck:passthrough([Db, Job, Data])
+    end),
+    IndexerPid ! continue.
+
+
+wait_indexer_update(Timeout) ->
+    receive
+        {Pid, Job, Data} when is_pid(Pid) -> {Pid, Job, Data}
+    after Timeout ->
+        error(timeout_in_wait_indexer_update)
+    end.

Reply via email to