This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch improve-fabric2-event-listener
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit adc709cb11022d9176dd7bebd4b7f5d3d312eb76
Author: Nick Vatamaniuc <[email protected]>
AuthorDate: Tue Mar 24 18:07:07 2020 -0400

    Improve fabric2_events a bit
    
     * Avoid a cause clause error in `after 0` when the database is deleted
    
     * Handle db re-cretion by checking the instance UUID during 
`fabric2_db:open/2`
    
    Since we added a few extra arguments switching to using a map as a state.
---
 src/fabric/src/fabric2_events.erl         | 52 +++++++++++++++++++--------
 src/fabric/test/fabric2_db_misc_tests.erl | 59 ++++++++++++++++++++++++++++++-
 2 files changed, 95 insertions(+), 16 deletions(-)

diff --git a/src/fabric/src/fabric2_events.erl 
b/src/fabric/src/fabric2_events.erl
index 094ca2f..9d7cb57 100644
--- a/src/fabric/src/fabric2_events.erl
+++ b/src/fabric/src/fabric2_events.erl
@@ -19,17 +19,24 @@
 ]).
 
 -export([
-    init/5,
-    poll/5
+    init/2,
+    poll/1
 ]).
 
 
 -include_lib("couch/include/couch_db.hrl").
 
 
-link_listener(Mod, Fun, St, Options) ->
-    DbName = fabric2_util:get_value(dbname, Options),
-    Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]),
+link_listener(Mod, Fun, Acc, Options) ->
+    State = #{
+        dbname => fabric2_util:get_value(dbname, Options),
+        uuid => fabric2_util:get_value(uuid, Options),
+        timeout => fabric2_util:get_value(timeout, Options),
+        mod => Mod,
+        callback => Fun,
+        acc => Acc
+    },
+    Pid = spawn_link(?MODULE, init, [self(), State]),
     receive
         {Pid, initialized} -> ok
     end,
@@ -40,29 +47,40 @@ stop_listener(Pid) ->
     Pid ! stop_listening.
 
 
-init(Parent, DbName, Mod, Fun, St) ->
+init(Parent, #{dbname := DbName} = State) ->
     {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
     Since = fabric2_db:get_update_seq(Db),
     erlang:monitor(process, Parent),
     Parent ! {self(), initialized},
-    poll(DbName, Since, Mod, Fun, St).
+    poll(State#{since => Since}).
 
 
-poll(DbName, Since, Mod, Fun, St) ->
+poll(#{} = State) ->
+    #{
+        dbname := DbName,
+        uuid := DbUUID,
+        timeout := Timeout,
+        since := Since,
+        mod := Mod,
+        callback := Fun,
+        acc := Acc
+    } = State,
     {Resp, NewSince} = try
-        case fabric2_db:open(DbName, [?ADMIN_CTX]) of
+        Opts = [?ADMIN_CTX, {uuid, DbUUID}],
+        case fabric2_db:open(DbName, Opts) of
             {ok, Db} ->
                 case fabric2_db:get_update_seq(Db) of
                     Since ->
-                        {{ok, St}, Since};
+                        {{ok, Acc}, Since};
                     Other ->
-                        {Mod:Fun(DbName, updated, St), Other}
+                        {Mod:Fun(DbName, updated, Acc), Other}
                 end;
             Error ->
                 exit(Error)
         end
     catch error:database_does_not_exist ->
-        Mod:Fun(DbName, deleted, St)
+        Mod:Fun(DbName, deleted, Acc),
+        {{stop, ok}, Since}
     end,
     receive
         stop_listening ->
@@ -71,9 +89,13 @@ poll(DbName, Since, Mod, Fun, St) ->
             ok
     after 0 ->
         case Resp of
-            {ok, NewSt} ->
-                timer:sleep(1000),
-                ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt);
+            {ok, NewAcc} ->
+                timer:sleep(Timeout),
+                NewState = State#{
+                    since := NewSince,
+                    acc := NewAcc
+                },
+                ?MODULE:poll(NewState);
             {stop, _} ->
                 ok
         end
diff --git a/src/fabric/test/fabric2_db_misc_tests.erl 
b/src/fabric/test/fabric2_db_misc_tests.erl
index fe0ae9f..1959982 100644
--- a/src/fabric/test/fabric2_db_misc_tests.erl
+++ b/src/fabric/test/fabric2_db_misc_tests.erl
@@ -13,6 +13,12 @@
 -module(fabric2_db_misc_tests).
 
 
+% Used in events_listener test
+-export([
+    event_listener_callback/3
+]).
+
+
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -42,7 +48,8 @@ misc_test_() ->
                 ?TDEF(get_full_doc_infos),
                 ?TDEF(ensure_full_commit),
                 ?TDEF(metadata_bump),
-                ?TDEF(db_version_bump)
+                ?TDEF(db_version_bump),
+                ?TDEF(events_listener)
             ])
         }
     }.
@@ -334,3 +341,53 @@ db_version_bump({DbName, _, _}) ->
 
     % Check that db handle in the cache got the new metadata version
     ?assertMatch(#{db_version := NewDbVersion}, Db2).
+
+
+events_listener({DbName, Db, _}) ->
+    Opts = [
+        {dbname, DbName},
+        {uuid, fabric2_db:get_uuid(Db)},
+        {timeout, 100}
+    ],
+
+    Fun = event_listener_callback,
+    {ok, Pid} = fabric2_events:link_listener(?MODULE, Fun, self(), Opts),
+    unlink(Pid),
+    Ref = monitor(process, Pid),
+
+    NextEvent = fun(Timeout) ->
+        receive
+            {Pid, Evt} when is_pid(Pid) -> Evt;
+            {'DOWN', Ref, _, _, normal} -> exited_normal
+        after Timeout ->
+            timeout
+        end
+    end,
+
+    Doc1 = #doc{id = couch_uuids:random()},
+    {ok, _} = fabric2_db:update_doc(Db, Doc1, []),
+    ?assertEqual(updated, NextEvent(1000)),
+
+    % Just one update, then expect a timeout
+    ?assertEqual(timeout, NextEvent(500)),
+
+    Doc2 = #doc{id = couch_uuids:random()},
+    {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+    ?assertEqual(updated, NextEvent(1000)),
+
+    % Process is still alive
+    ?assert(is_process_alive(Pid)),
+
+    % Recreate db
+    ok = fabric2_db:delete(DbName, [?ADMIN_CTX]),
+    {ok, _} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+    ?assertEqual(deleted, NextEvent(1000)),
+
+    % After db is deleted or re-created listener should die
+    ?assertEqual(exited_normal, NextEvent(1000)).
+
+
+% Callback for event_listener function
+event_listener_callback(_DbName, Event, TestPid) ->
+    TestPid ! {self(), Event},
+    {ok, TestPid}.

Reply via email to