Repository: couchdb-global-changes Updated Branches: refs/heads/windsor-merge a55e2d228 -> 7e63e9291 (forced update)
Reimplement global_change rate limiting Rather than just checking when a change occurs to update we instead just update after max_write_delay milliseconds. Project: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/commit/7e63e929 Tree: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/tree/7e63e929 Diff: http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/diff/7e63e929 Branch: refs/heads/windsor-merge Commit: 7e63e9291815b93011e2c04718bbac9b23d6e322 Parents: 0324bf4 Author: Paul J. Davis <[email protected]> Authored: Wed Aug 13 15:54:17 2014 -0500 Committer: Paul J. Davis <[email protected]> Committed: Wed Aug 13 16:21:33 2014 -0500 ---------------------------------------------------------------------- src/global_changes_listener.erl | 2 +- src/global_changes_server.erl | 99 +++++++++++++++++------------------- 2 files changed, 48 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/7e63e929/src/global_changes_listener.erl ---------------------------------------------------------------------- diff --git a/src/global_changes_listener.erl b/src/global_changes_listener.erl index c25a0d1..9befdfd 100644 --- a/src/global_changes_listener.erl +++ b/src/global_changes_listener.erl @@ -46,7 +46,7 @@ start() -> init(_) -> % get configs as strings UpdateDb0 = config:get("global_changes", "update_db", "true"), - MaxEventDelay0 = config:get("global_changes", "max_event_delay", "500"), + MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"), % make config strings into other data types UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end, http://git-wip-us.apache.org/repos/asf/couchdb-global-changes/blob/7e63e929/src/global_changes_server.erl ---------------------------------------------------------------------- diff --git a/src/global_changes_server.erl b/src/global_changes_server.erl index e6a8e54..1762b76 100644 --- a/src/global_changes_server.erl +++ b/src/global_changes_server.erl @@ -40,7 +40,6 @@ -record(state, { update_db, pending_update_count, - last_update_time, pending_updates, max_write_delay, dbname, @@ -56,12 +55,15 @@ init([]) -> {ok, Handler} = global_changes_listener:start(), % get configs as strings UpdateDb0 = config:get("global_changes", "update_db", "true"), - MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "500"), + MaxWriteDelay0 = config:get("global_changes", "max_write_delay", "25"), % make config strings into other data types UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end, MaxWriteDelay = list_to_integer(MaxWriteDelay0), + % Start our write triggers + erlang:send_after(MaxWriteDelay, self(), flush_updates), + State = #state{ update_db=UpdateDb, pending_update_count=0, @@ -89,11 +91,11 @@ handle_cast({update_docs, DocIds}, State) -> pending_updates=Pending, pending_update_count=sets:size(Pending) }, - maybe_update_docs(NewState); + {noreply, NewState}; handle_cast({set_max_write_delay, MaxWriteDelay}, State) -> NewState = State#state{max_write_delay=MaxWriteDelay}, - maybe_update_docs(NewState); + {noreply, NewState}; handle_cast({set_update_db, Boolean}, State0) -> % If turning update_db off, clear out server state State = case {Boolean, State0#state.update_db} of @@ -101,76 +103,69 @@ handle_cast({set_update_db, Boolean}, State0) -> State0#state{ update_db=Boolean, pending_updates=sets:new(), - pending_update_count=0, - last_update_time=undefined + pending_update_count=0 }; _ -> State0#state{update_db=Boolean} end, - maybe_update_docs(State); + {noreply, State}; handle_cast(_Msg, State) -> - maybe_update_docs(State). + {noreply, State}. +handle_info(flush_updates, #state{pending_update_count=0}=State) -> + erlang:send_after(State#state.max_write_delay, self(), flush_updates), + {noreply, State}; +handle_info(flush_updates, #state{update_db=false}=State) -> + erlang:send_after(State#state.max_write_delay, self(), flush_updates), + {noreply, State}; +handle_info(flush_updates, State) -> + erlang:send_after(State#state.max_write_delay, self(), flush_updates), + flush_updates(State); handle_info(start_listener, State) -> {ok, Handler} = global_changes_listener:start(), NewState = State#state{ handler_ref=erlang:monitor(process, Handler) }, - maybe_update_docs(NewState); + {noreply, NewState}; handle_info({'DOWN', Ref, _, _, Reason}, #state{handler_ref=Ref}=State) -> couch_log:error("global_changes_listener terminated: ~w", [Reason]), erlang:send_after(5000, self(), start_listener), - maybe_update_docs(State); + {noreply, State}; handle_info(_, State) -> - maybe_update_docs(State). + {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. -maybe_update_docs(#state{pending_update_count=0}=State) -> - {noreply, State}; -maybe_update_docs(#state{update_db=true}=State) -> - #state{max_write_delay=MaxWriteDelay, last_update_time=LastUpdateTime} = State, - Now = os:timestamp(), - case LastUpdateTime of - undefined -> - {noreply, State#state{last_update_time=Now}, MaxWriteDelay}; - _ -> - Delta = round(timer:now_diff(Now, LastUpdateTime)/1000), - if Delta >= MaxWriteDelay -> - DocIds = sets:to_list(State#state.pending_updates), - try group_ids_by_shard(State#state.dbname, DocIds) of - GroupedIds -> - Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) -> - {ok, Shard} = couch_db:open(ShardName, []), - try - GroupedDocs = get_docs_locally(Shard, Ids), - GroupedDocs ++ DocInfoAcc - after - couch_db:close(Shard) - end - end, [], GroupedIds), - - spawn(fun() -> - fabric:update_docs(State#state.dbname, Docs, []) - end) - catch error:database_does_not_exist -> - {noreply, State} - end, - {noreply, State#state{ - pending_updates=sets:new(), - pending_update_count=0, - last_update_time=undefined - }}; - true -> - {noreply, State, MaxWriteDelay-Delta} - end - end; -maybe_update_docs(State) -> - {noreply, State}. + +flush_updates(State) -> + DocIds = sets:to_list(State#state.pending_updates), + try group_ids_by_shard(State#state.dbname, DocIds) of + GroupedIds -> + Docs = dict:fold(fun(ShardName, Ids, DocInfoAcc) -> + {ok, Shard} = couch_db:open(ShardName, []), + try + GroupedDocs = get_docs_locally(Shard, Ids), + GroupedDocs ++ DocInfoAcc + after + couch_db:close(Shard) + end + end, [], GroupedIds), + + spawn(fun() -> + couch_log:error("DOCS ~p", [Docs]), + fabric:update_docs(State#state.dbname, Docs, []) + end) + catch error:database_does_not_exist -> + {noreply, State} + end, + {noreply, State#state{ + pending_updates=sets:new(), + pending_update_count=0 + }}. update_docs(Node, Updates) ->
