Repository: couchdb-mem3 Updated Branches: refs/heads/COUCHDB-3287-pluggable-storage-engines 24442fb5c -> e2fa78fc6 (forced update)
Fix stale shards cache There's a race condition in mem3_shards that can result in having shards in the cache for a database that's been deleted. This results in a confused cluster that thinks a database exists until you attempt to open it. The fix is to ignore any cache insert requests that come from an older version of the dbs db than mem3_shards cache knows about. Big thanks to @jdoane for the identification and original patch. Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/e2fa78fc Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/e2fa78fc Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/e2fa78fc Branch: refs/heads/COUCHDB-3287-pluggable-storage-engines Commit: e2fa78fc6a9d003d36ebfe648cba26c4e0e56765 Parents: 0c9e63e Author: Paul J. Davis <[email protected]> Authored: Fri Feb 24 12:55:37 2017 -0600 Committer: Paul J. Davis <[email protected]> Committed: Wed Mar 22 16:31:00 2017 -0500 ---------------------------------------------------------------------- src/mem3_shards.erl | 54 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/e2fa78fc/src/mem3_shards.erl ---------------------------------------------------------------------- diff --git a/src/mem3_shards.erl b/src/mem3_shards.erl index 11131a1..fb44218 100644 --- a/src/mem3_shards.erl +++ b/src/mem3_shards.erl @@ -27,7 +27,8 @@ -record(st, { max_size = 25000, cur_size = 0, - changes_pid + changes_pid, + update_seq }). -include_lib("mem3/include/mem3.hrl"). @@ -180,11 +181,12 @@ init([]) -> ets:new(?ATIMES, [ordered_set, protected, named_table]), ok = config:listen_for_changes(?MODULE, nil), SizeList = config:get("mem3", "shard_cache_size", "25000"), - {Pid, _} = spawn_monitor(fun() -> listen_for_changes(get_update_seq()) end), + UpdateSeq = get_update_seq(), {ok, #st{ max_size = list_to_integer(SizeList), cur_size = 0, - changes_pid = Pid + changes_pid = start_changes_listener(UpdateSeq), + update_seq = UpdateSeq }}. handle_call({set_max_size, Size}, _From, St) -> @@ -199,12 +201,23 @@ handle_cast({cache_hit, DbName}, St) -> couch_stats:increment_counter([mem3, shard_cache, hit]), cache_hit(DbName), {noreply, St}; -handle_cast({cache_insert, DbName, Shards}, St) -> +handle_cast({cache_insert, DbName, Shards, UpdateSeq}, St) -> couch_stats:increment_counter([mem3, shard_cache, miss]), - {noreply, cache_free(cache_insert(St, DbName, Shards))}; + NewSt = case UpdateSeq < St#st.update_seq of + true -> St; + false -> cache_free(cache_insert(St, DbName, Shards)) + end, + {noreply, NewSt}; handle_cast({cache_remove, DbName}, St) -> couch_stats:increment_counter([mem3, shard_cache, eviction]), {noreply, cache_remove(St, DbName)}; +handle_cast({cache_insert_change, DbName, Shards, UpdateSeq}, St) -> + Msg = {cache_insert, DbName, Shards, UpdateSeq}, + {noreply, NewSt} = handle_cast(Msg, St), + {noreply, NewSt#st{update_seq = UpdateSeq}}; +handle_cast({cache_remove_change, DbName, UpdateSeq}, St) -> + {noreply, NewSt} = handle_cast({cache_remove, DbName}, St), + {noreply, NewSt#st{update_seq = UpdateSeq}}; handle_cast(_Msg, St) -> {noreply, St}. @@ -221,8 +234,9 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid}=St) -> erlang:send_after(5000, self(), {start_listener, Seq}), {noreply, NewSt#st{changes_pid=undefined}}; handle_info({start_listener, Seq}, St) -> - {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, St#st{changes_pid=NewPid}}; + {noreply, St#st{ + changes_pid = start_changes_listener(Seq) + }}; handle_info(restart_config_listener, State) -> ok = config:listen_for_changes(?MODULE, nil), {noreply, State}; @@ -238,6 +252,21 @@ code_change(_OldVsn, #st{}=St, _Extra) -> %% internal functions +start_changes_listener(SinceSeq) -> + Self = self(), + {Pid, _} = erlang:spawn_monitor(fun() -> + erlang:spawn_link(fun() -> + Ref = erlang:monitor(process, Self), + receive + {'DOWN', Ref, _, _, _} -> + ok + end, + exit(shutdown) + end), + listen_for_changes(SinceSeq) + end), + Pid. + fold_fun(#full_doc_info{}=FDI, Acc) -> DI = couch_doc:to_doc_info(FDI), fold_fun(DI, Acc); @@ -277,10 +306,11 @@ changes_callback({stop, EndSeq}, _) -> exit({seq, EndSeq}); changes_callback({change, {Change}, _}, _) -> DbName = couch_util:get_value(<<"id">>, Change), + Seq = couch_util:get_value(<<"seq">>, Change), case DbName of <<"_design/", _/binary>> -> ok; _Else -> case mem3_util:is_deleted(Change) of true -> - gen_server:cast(?MODULE, {cache_remove, DbName}); + gen_server:cast(?MODULE, {cache_remove_change, DbName, Seq}); false -> case couch_util:get_value(doc, Change) of {error, Reason} -> @@ -288,13 +318,14 @@ changes_callback({change, {Change}, _}, _) -> [DbName, Reason]); {Doc} -> Shards = mem3_util:build_ordered_shards(DbName, Doc), - gen_server:cast(?MODULE, {cache_insert, DbName, Shards}), + Msg = {cache_insert_change, DbName, Shards, Seq}, + gen_server:cast(?MODULE, Msg), [create_if_missing(mem3:name(S), mem3:engine(S)) || S <- Shards, mem3:node(S) =:= node()] end end end, - {ok, couch_util:get_value(<<"seq">>, Change)}; + {ok, Seq}; changes_callback(timeout, _) -> ok. @@ -310,8 +341,9 @@ load_shards_from_disk(DbName) when is_binary(DbName) -> load_shards_from_db(ShardDb, DbName) -> case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> + Seq = couch_db:get_update_seq(ShardDb), Shards = mem3_util:build_ordered_shards(DbName, Props), - gen_server:cast(?MODULE, {cache_insert, DbName, Shards}), + gen_server:cast(?MODULE, {cache_insert, DbName, Shards, Seq}), Shards; {not_found, _} -> erlang:error(database_does_not_exist, ?b2l(DbName))
