This is an automated email from the ASF dual-hosted git repository. wohali pushed a commit to branch 749-fix-couch_peruser-app-structure in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit f0c28835a6a8d4b08de4eaf94b7d07d2d4865e8c Author: Jan Lehnardt <j...@apache.org> AuthorDate: Sun Oct 8 15:36:31 2017 +0200 make sure peruser listeners are only initialised once per node --- src/couch_peruser/src/couch_peruser.erl | 59 ++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/src/couch_peruser/src/couch_peruser.erl b/src/couch_peruser/src/couch_peruser.erl index 791431c..e722b7e 100644 --- a/src/couch_peruser/src/couch_peruser.erl +++ b/src/couch_peruser/src/couch_peruser.erl @@ -32,7 +32,14 @@ cluster_unstable/1 ]). --record(state, {parent, db_name, delete_dbs, changes_pid, changes_ref}). +-record(state, { + parent, + db_name, + delete_dbs, + changes_pid, + changes_ref +}). + -record(clusterState, { parent, db_name, @@ -47,6 +54,9 @@ -define(DEFAULT_QUIET_PERIOD, 60). % seconds -define(DEFAULT_START_PERIOD, 5). % seconds +%% +%% Please leave in the commented-out couch_log:debug calls, thanks! — Jan +%% start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -72,8 +82,6 @@ init() -> {ok, Mem3Cluster} = mem3_cluster:start_link(?MODULE, self(), StartPeriod, Period), - couch_log:debug("peruser: registered for cluster event on node ~p", [node()]), - #clusterState{ parent = self(), db_name = DbName, @@ -86,11 +94,14 @@ init() -> % Cluster membership change notification callback -spec notify_cluster_event(pid(), {cluster, any()}) -> ok. notify_cluster_event(Server, {cluster, _} = Event) -> - couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]), + % couch_log:debug("peruser: received cluster event ~p on node ~p", [Event, node()]), gen_server:cast(Server, Event). +start_listening(#clusterState{states=States}=ClusterState) when length(States) > 0 -> + % couch_log:debug("peruser: start_listening() already run on node ~p in pid ~p", [node(), self()]), + ClusterState; start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterState) -> - couch_log:debug("peruser: start_listening() on node ~p", [node()]), + % couch_log:debug("peruser: start_listening() on node ~p", [node()]), try States = lists:map(fun (A) -> S = #state{parent = ClusterState#clusterState.parent, @@ -100,6 +111,7 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta ?MODULE, init_changes_handler, [S], [link, monitor]), S#state{changes_pid=Pid, changes_ref=Ref} end, mem3:local_shards(DbName)), + % couch_log:debug("peruser: start_listening() States ~p", [States]), ClusterState#clusterState{states = States, cluster_stable = true} catch error:database_does_not_exist -> @@ -108,7 +120,6 @@ start_listening(#clusterState{db_name=DbName, delete_dbs=DeleteDbs} = ClusterSta end. init_changes_handler(#state{db_name=DbName} = State) -> - % leave for debugging % couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]), try {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]), @@ -123,7 +134,6 @@ init_changes_handler(#state{db_name=DbName} = State) -> changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName}) -> - % leave for debugging % couch_log:debug("peruser: changes_handler() on DbName/Doc ~p/~p", [DbName, Doc]), case couch_util:get_value(<<"id">>, Doc) of @@ -155,28 +165,31 @@ changes_handler({change, {Doc}, _Prepend}, _ResType, State=#state{db_name=DbName changes_handler(_Event, _ResType, State) -> State. + should_handle_doc(ShardName, DocId) -> - should_handle_doc_int(ShardName, DocId, is_stable()). - -should_handle_doc_int(ShardName, DocId, false) -> - % when the cluster is unstable, we have already stopped all Listeners - % the next stable event will restart all listeners and pick up this - % doc change - couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]), - false; -should_handle_doc_int(ShardName, DocId, true) -> + case is_stable() of + false -> + % when the cluster is unstable, we have already stopped all Listeners + % the next stable event will restart all listeners and pick up this + % doc change + couch_log:debug("peruser: skipping, cluster unstable ~s/~s", [ShardName, DocId]), + false; + true -> + should_handle_doc_int(ShardName, DocId) + end. + +should_handle_doc_int(ShardName, DocId) -> DbName = mem3:dbname(ShardName), Live = [erlang:node() | erlang:nodes()], Shards = mem3:shards(DbName, DocId), Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)], case mem3:owner(DbName, DocId, Nodes) of - ThisNode when ThisNode =:= node() -> - couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]), - % do the deed - true; - _OtherNode -> - couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]), - false + ThisNode when ThisNode =:= node() -> + couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]), + true; % do the database action + _OtherNode -> + couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]), + false end. -- To stop receiving notification emails like this one, please contact "commits@couchdb.apache.org" <commits@couchdb.apache.org>.