This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch improve-db-props in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3546804ba5126bec75f2f1d1a7b95eb128869af3 Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Sat Sep 20 00:44:06 2025 -0400 Cache and store mem3 shard properties in one place only Previously, shard properties were duplicated across all Q*N shards in the cache. If we needed to access them we loaded all the shards (from ets or disk), and then immediately threw them all away except the first one. To optimise and clean up properties put them in their own ?OPTS ets table. Item lookup, updating, and cleanup mirrors the behavior of ?SHARDS. There are a few other related optimisations and cleanups: * In the `for_docid` function we calculated the hash twice: once, when we calculated the `HashKey` for the ets selector, then again, in the `load_shards_from_disk(DbName, DocId)` if we loaded shards from disk. To optimise it, calculate the `HashKey` once and pass it on as `load_shards_from_disk(DbName, HashKey)`. * Previously, we didn't cache the properties for the shards dbs itself, so add a way to do that. If shards db changes the changes feed will restart ,and then the shards dbs properties will update again. These properties may be used used in the `_all_docs` call for instance, so having it cached would help not having to load it from disk. * Remove functions which were not used anywhere, and stop exporting functions which are used locally only: `mem3:engine/1`, `find_dirty_shards/0`, `gen_engine_opt/1`, `get_props_opt/1`, `get_shard_props/1`. * For mem3 shards and opts ets tables, since there could be multiple pending writers in different processes trying to update different entries, it makes sense to also enable `{write_concurrency, auto}` for those public tables. See: https://www.erlang.org/doc/apps/stdlib/ets#new_2_write_concurrency --- src/chttpd/src/chttpd_db.erl | 3 +- src/fabric/src/fabric.erl | 3 +- src/fabric/src/fabric_util.erl | 26 +++--- src/fabric/test/eunit/fabric_bench_test.erl | 2 +- src/mem3/include/mem3.hrl | 4 +- src/mem3/src/mem3.erl | 26 ++---- src/mem3/src/mem3_hash.erl | 28 ++++--- src/mem3/src/mem3_shards.erl | 125 ++++++++++++++++++---------- src/mem3/src/mem3_util.erl | 53 +----------- 9 files changed, 124 insertions(+), 146 deletions(-) diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 4bab083d4..4e1b75f11 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -454,8 +454,7 @@ delete_db_req(#httpd{} = Req, DbName) -> end. do_db_req(#httpd{path_parts = [DbName | _], user_ctx = Ctx} = Req, Fun) -> - Shard = hd(mem3:shards(DbName)), - Props = couch_util:get_value(props, Shard#shard.opts, []), + Props = mem3:props(DbName), Opts = case Ctx of undefined -> diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index a2bf82482..0878c4ebc 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -730,8 +730,7 @@ doc(Db0, {_} = Doc) -> true -> Db0; false -> - Shard = hd(mem3:shards(Db0)), - Props = couch_util:get_value(props, Shard#shard.opts, []), + Props = mem3:props(Db0), {ok, Db1} = couch_db:clustered_db(Db0, [{props, Props}]), Db1 end, diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index d0961533f..af292f761 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -296,15 +296,12 @@ is_users_db(DbName) -> path_ends_with(Path, Suffix) -> Suffix =:= couch_db:dbname_suffix(Path). -open_cluster_db(#shard{dbname = DbName, opts = Options}) -> - case couch_util:get_value(props, Options) of - Props when is_list(Props) -> - {ok, Db} = couch_db:clustered_db(DbName, [{props, Props}]), - Db; - _ -> - {ok, Db} = couch_db:clustered_db(DbName, []), - Db - end. +open_cluster_db(#shard{dbname = DbName}) -> + open_cluster_db(DbName); +open_cluster_db(DbName) when is_binary(DbName) -> + Props = mem3:props(DbName), + {ok, Db} = couch_db:clustered_db(DbName, [{props, Props}]), + Db. open_cluster_db(DbName, Opts) -> % as admin @@ -320,25 +317,22 @@ kv(Item, Count) -> doc_id_and_rev(#doc{id = DocId, revs = {RevNum, [RevHash | _]}}) -> {DocId, {RevNum, RevHash}}. -is_partitioned(DbName0) when is_binary(DbName0) -> - Shards = mem3:shards(fabric:dbname(DbName0)), - is_partitioned(open_cluster_db(hd(Shards))); +is_partitioned(DbName) when is_binary(DbName) -> + is_partitioned(open_cluster_db(DbName)); is_partitioned(Db) -> couch_db:is_partitioned(Db). validate_all_docs_args(DbName, Args) when is_list(DbName) -> validate_all_docs_args(list_to_binary(DbName), Args); validate_all_docs_args(DbName, Args) when is_binary(DbName) -> - Shards = mem3:shards(fabric:dbname(DbName)), - Db = open_cluster_db(hd(Shards)), + Db = open_cluster_db(DbName), validate_all_docs_args(Db, Args); validate_all_docs_args(Db, Args) -> true = couch_db:is_clustered(Db), couch_mrview_util:validate_all_docs_args(Db, Args). validate_args(DbName, DDoc, Args) when is_binary(DbName) -> - Shards = mem3:shards(fabric:dbname(DbName)), - Db = open_cluster_db(hd(Shards)), + Db = open_cluster_db(DbName), validate_args(Db, DDoc, Args); validate_args(Db, DDoc, Args) -> true = couch_db:is_clustered(Db), diff --git a/src/fabric/test/eunit/fabric_bench_test.erl b/src/fabric/test/eunit/fabric_bench_test.erl index ea514cce8..f055d24da 100644 --- a/src/fabric/test/eunit/fabric_bench_test.erl +++ b/src/fabric/test/eunit/fabric_bench_test.erl @@ -59,7 +59,7 @@ t_old_db_deletion_works(_Ctx) -> % Quick db creation and deletion is racy so % we have to wait until the db is gone before proceeding. WaitFun = fun() -> - try mem3_shards:opts_for_db(Db) of + try mem3:props(Db) of _ -> wait catch error:database_does_not_exist -> diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl index d97b25469..fa232e840 100644 --- a/src/mem3/include/mem3.hrl +++ b/src/mem3/include/mem3.hrl @@ -22,7 +22,7 @@ dbname :: binary() | 'undefined', range :: [non_neg_integer() | '$1' | '$2'] | '_' | 'undefined', ref :: reference() | '_' | 'undefined', - opts :: list() | 'undefined' + opts = []:: list() | 'undefined' }). %% Do not reference outside of mem3. @@ -33,7 +33,7 @@ range :: [non_neg_integer() | '$1' | '$2'] | '_', ref :: reference() | 'undefined' | '_', order :: non_neg_integer() | 'undefined' | '_', - opts :: list() + opts = []:: list() }). %% types diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index 5e419ea12..f9978e892 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -18,6 +18,7 @@ restart/0, nodes/0, node_info/2, + props/1, shards/1, shards/2, choose_shards/2, n/1, n/2, @@ -40,7 +41,7 @@ -export([generate_shard_suffix/0]). %% For mem3 use only. --export([name/1, node/1, range/1, engine/1]). +-export([name/1, node/1, range/1]). -include_lib("mem3/include/mem3.hrl"). @@ -115,6 +116,11 @@ nodes() -> node_info(Node, Key) -> mem3_nodes:get_node_info(Node, Key). +-spec props(DbName :: iodata()) -> []. +props(DbName) -> + Opts = mem3_shards:opts_for_db(DbName), + couch_util:get_value(props, Opts, []). + -spec shards(DbName :: iodata()) -> [#shard{}]. shards(DbName) -> shards_int(DbName, []). @@ -135,8 +141,7 @@ shards_int(DbName, Options) -> name = ShardDbName, dbname = ShardDbName, range = [0, (2 bsl 31) - 1], - order = undefined, - opts = [] + order = undefined } ]; ShardDbName -> @@ -147,8 +152,7 @@ shards_int(DbName, Options) -> node = config:node_name(), name = ShardDbName, dbname = ShardDbName, - range = [0, (2 bsl 31) - 1], - opts = [] + range = [0, (2 bsl 31) - 1] } ]; _ -> @@ -416,18 +420,6 @@ name(#ordered_shard{name = Name}) -> owner(DbName, DocId, Nodes) -> hd(mem3_util:rotate_list({DbName, DocId}, lists:usort(Nodes))). -engine(#shard{opts = Opts}) -> - engine(Opts); -engine(#ordered_shard{opts = Opts}) -> - engine(Opts); -engine(Opts) when is_list(Opts) -> - case couch_util:get_value(engine, Opts) of - Engine when is_binary(Engine) -> - [{engine, Engine}]; - _ -> - [] - end. - %% Check whether a node is up or down %% side effect: set up a connection to Node if there not yet is one. diff --git a/src/mem3/src/mem3_hash.erl b/src/mem3/src/mem3_hash.erl index 6dfe3f45a..75182518c 100644 --- a/src/mem3/src/mem3_hash.erl +++ b/src/mem3/src/mem3_hash.erl @@ -23,33 +23,35 @@ -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). -calculate(#shard{opts = Opts}, DocId) -> - Props = couch_util:get_value(props, Opts, []), - MFA = get_hash_fun_int(Props), +calculate(#shard{dbname = DbName}, DocId) -> + MFA = get_hash_fun(DbName), calculate(MFA, DocId); -calculate(#ordered_shard{opts = Opts}, DocId) -> - Props = couch_util:get_value(props, Opts, []), - MFA = get_hash_fun_int(Props), +calculate(#ordered_shard{dbname = DbName}, DocId) -> + MFA = get_hash_fun(DbName), calculate(MFA, DocId); calculate(DbName, DocId) when is_binary(DbName) -> MFA = get_hash_fun(DbName), calculate(MFA, DocId); +calculate(Props, DocId) when is_list(Props) -> + MFA = get_hash_fun(Props), + calculate(MFA, DocId); calculate({Mod, Fun, Args}, DocId) -> erlang:apply(Mod, Fun, [DocId | Args]). -get_hash_fun(#shard{opts = Opts}) -> - get_hash_fun_int(Opts); -get_hash_fun(#ordered_shard{opts = Opts}) -> - get_hash_fun_int(Opts); +get_hash_fun(#shard{dbname = DbName}) -> + get_hash_fun(DbName); +get_hash_fun(#ordered_shard{dbname = DbName}) -> + get_hash_fun(DbName); get_hash_fun(DbName0) when is_binary(DbName0) -> DbName = mem3:dbname(DbName0), try - [#shard{opts = Opts} | _] = mem3_shards:for_db(DbName), - get_hash_fun_int(couch_util:get_value(props, Opts, [])) + get_hash_fun_int(mem3:props(DbName)) catch error:database_does_not_exist -> {?MODULE, crc32, []} - end. + end; +get_hash_fun(Props) when is_list(Props) -> + get_hash_fun_int(Props). crc32(Item) when is_binary(Item) -> erlang:crc32(Item); diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index d256d3e90..4550cf32c 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -39,6 +39,7 @@ -define(DBS, mem3_dbs). -define(SHARDS, mem3_shards). +-define(OPTS, mem3_opts). -define(ATIMES, mem3_atimes). -define(OPENERS, mem3_openers). -define(RELISTEN_DELAY, 5000). @@ -46,14 +47,19 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -opts_for_db(DbName0) -> +opts_for_db(DbName) when is_list(DbName) -> + opts_for_db(list_to_binary(DbName)); +opts_for_db(DbName0) when is_binary(DbName0) -> DbName = mem3:dbname(DbName0), - {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()), - case couch_db:open_doc(Db, DbName, [ejson_body]) of - {ok, #doc{body = {Props}}} -> - mem3_util:get_shard_opts(Props); - {not_found, _} -> - erlang:error(database_does_not_exist, [DbName]) + try ets:lookup(?OPTS, DbName) of + [] -> + load_opts_from_disk(DbName); + [{_, Props}] -> + gen_server:cast(?MODULE, {cache_hit, DbName}), + Props + catch + error:badarg -> + load_opts_from_disk(DbName) end. for_db(DbName) -> @@ -80,7 +86,7 @@ for_docid(DbName, DocId) -> for_docid(DbName, DocId, []). for_docid(DbName, DocId, Options) -> - HashKey = mem3_hash:calculate(DbName, DocId), + HashKey = mem3_hash:calculate(mem3:props(DbName), DocId), ShardHead = #shard{ dbname = DbName, range = ['$1', '$2'], @@ -97,13 +103,13 @@ for_docid(DbName, DocId, Options) -> Shards = try ets:select(?SHARDS, [ShardSpec, OrderedShardSpec]) of [] -> - load_shards_from_disk(DbName, DocId); + load_shards_from_disk(DbName, HashKey); Else -> gen_server:cast(?MODULE, {cache_hit, DbName}), Else catch error:badarg -> - load_shards_from_disk(DbName, DocId) + load_shards_from_disk(DbName, HashKey) end, case lists:member(ordered, Options) of true -> Shards; @@ -225,13 +231,9 @@ handle_config_terminate(_Server, _Reason, _State) -> init([]) -> couch_util:set_mqd_off_heap(?MODULE), - ets:new(?SHARDS, [ - bag, - public, - named_table, - {keypos, #shard.dbname}, - {read_concurrency, true} - ]), + CacheEtsOpts = [public, named_table, {read_concurrency, true}, {write_concurrency, auto}], + ets:new(?OPTS, CacheEtsOpts), + ets:new(?SHARDS, [bag, {keypos, #shard.dbname}] ++ CacheEtsOpts), ets:new(?DBS, [set, protected, named_table]), ets:new(?ATIMES, [ordered_set, protected, named_table]), ets:new(?OPENERS, [bag, public, named_table]), @@ -239,6 +241,7 @@ init([]) -> SizeList = config:get("mem3", "shard_cache_size", "25000"), WriteTimeout = config:get_integer("mem3", "shard_write_timeout", 1000), DbName = mem3_sync:shards_db(), + cache_shards_db_props(), ioq:set_io_priority({system, DbName}), UpdateSeq = get_update_seq(), {ok, #st{ @@ -304,6 +307,7 @@ handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid = Pid} = St) -> couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]), {St, get_update_seq()} end, + cache_shards_db_props(), erlang:send_after(5000, self(), {start_listener, Seq}), {noreply, NewSt#st{changes_pid = undefined}}; handle_info({start_listener, Seq}, St) -> @@ -361,6 +365,7 @@ listen_for_changes(Since) -> DbName = mem3_sync:shards_db(), ioq:set_io_priority({system, DbName}), {ok, Db} = mem3_util:ensure_exists(DbName), + Args = #changes_args{ feed = "continuous", since = Since, @@ -393,10 +398,11 @@ changes_callback({change, {Change}, _}, _) -> ); {Doc} -> Shards = mem3_util:build_ordered_shards(DbName, Doc), + DbOpts = mem3_util:get_shard_opts(Doc), IdleTimeout = config:get_integer( "mem3", "writer_idle_timeout", 30000 ), - Writer = spawn_shard_writer(DbName, Shards, IdleTimeout), + Writer = spawn_shard_writer(DbName, DbOpts, Shards, IdleTimeout), ets:insert(?OPENERS, {DbName, Writer}), Msg = {cache_insert_change, DbName, Writer, Seq}, gen_server:cast(?MODULE, Msg), @@ -417,18 +423,30 @@ load_shards_from_disk(DbName) when is_binary(DbName) -> couch_stats:increment_counter([mem3, shard_cache, miss]), {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()), try - load_shards_from_db(Db, DbName) + {Shards, _DbOpts} = load_from_db(Db, DbName), + Shards after couch_db:close(Db) end. -load_shards_from_db(ShardDb, DbName) -> +load_opts_from_disk(DbName) when is_binary(DbName) -> + couch_stats:increment_counter([mem3, shard_cache, miss]), + {ok, Db} = mem3_util:ensure_exists(mem3_sync:shards_db()), + try + {_Shards, DbOpts} = load_from_db(Db, DbName), + DbOpts + after + couch_db:close(Db) + end. + +load_from_db(ShardDb, DbName) -> case couch_db:open_doc(ShardDb, DbName, [ejson_body]) of {ok, #doc{body = {Props}}} -> Seq = couch_db:get_update_seq(ShardDb), Shards = mem3_util:build_ordered_shards(DbName, Props), + DbOpts = mem3_util:get_shard_opts(Props), IdleTimeout = config:get_integer("mem3", "writer_idle_timeout", 30000), - case maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) of + case maybe_spawn_shard_writer(DbName, DbOpts, Shards, IdleTimeout) of Writer when is_pid(Writer) -> case ets:insert_new(?OPENERS, {DbName, Writer}) of true -> @@ -440,14 +458,13 @@ load_shards_from_db(ShardDb, DbName) -> ignore -> ok end, - Shards; + {Shards, DbOpts}; {not_found, _} -> erlang:error(database_does_not_exist, [DbName]) end. -load_shards_from_disk(DbName, DocId) -> +load_shards_from_disk(DbName, HashKey) -> Shards = load_shards_from_disk(DbName), - HashKey = mem3_hash:calculate(hd(Shards), DocId), [S || S <- Shards, in_range(S, HashKey)]. in_range(Shard, HashKey) -> @@ -474,6 +491,7 @@ create_if_missing(ShardName) -> cache_insert(#st{cur_size = Cur} = St, DbName, Writer, Timeout) -> NewATime = couch_util:unique_monotonic_integer(), true = ets:delete(?SHARDS, DbName), + true = ets:delete(?OPTS, DbName), flush_write(DbName, Writer, Timeout), case ets:lookup(?DBS, DbName) of [{DbName, ATime}] -> @@ -489,6 +507,7 @@ cache_insert(#st{cur_size = Cur} = St, DbName, Writer, Timeout) -> cache_remove(#st{cur_size = Cur} = St, DbName) -> true = ets:delete(?SHARDS, DbName), + true = ets:delete(?OPTS, DbName), case ets:lookup(?DBS, DbName) of [{DbName, ATime}] -> true = ets:delete(?DBS, DbName), @@ -515,6 +534,7 @@ cache_free(#st{max_size = Max, cur_size = Cur} = St) when Max =< Cur -> true = ets:delete(?ATIMES, ATime), true = ets:delete(?DBS, DbName), true = ets:delete(?SHARDS, DbName), + true = ets:delete(?OPTS, DbName), cache_free(St#st{cur_size = Cur - 1}); cache_free(St) -> St. @@ -522,15 +542,16 @@ cache_free(St) -> cache_clear(St) -> true = ets:delete_all_objects(?DBS), true = ets:delete_all_objects(?SHARDS), + true = ets:delete_all_objects(?OPTS), true = ets:delete_all_objects(?ATIMES), St#st{cur_size = 0}. -maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) -> +maybe_spawn_shard_writer(DbName, DbOpts, Shards, IdleTimeout) -> try ets:member(?OPENERS, DbName) of true -> ignore; false -> - spawn_shard_writer(DbName, Shards, IdleTimeout) + spawn_shard_writer(DbName, DbOpts, Shards, IdleTimeout) catch error:badarg -> % We might have been called before mem3 finished initializing @@ -539,13 +560,14 @@ maybe_spawn_shard_writer(DbName, Shards, IdleTimeout) -> ignore end. -spawn_shard_writer(DbName, Shards, IdleTimeout) -> - erlang:spawn(fun() -> shard_writer(DbName, Shards, IdleTimeout) end). +spawn_shard_writer(DbName, DbOpts, Shards, IdleTimeout) -> + erlang:spawn(fun() -> shard_writer(DbName, DbOpts, Shards, IdleTimeout) end). -shard_writer(DbName, Shards, IdleTimeout) -> +shard_writer(DbName, DbOpts, Shards, IdleTimeout) -> try receive write -> + true = ets:insert(?OPTS, DbOpts), true = ets:insert(?SHARDS, Shards); cancel -> ok @@ -577,6 +599,14 @@ filter_shards_by_range(Range, Shards) -> Shards ). +cache_shards_db_props() -> + DbName = mem3_sync:shards_db(), + {ok, Db} = mem3_util:ensure_exists(DbName), + DbProps = couch_db:get_props(Db), + DbOpts = [{props, DbProps}], + ets:insert(?OPTS, {DbName, DbOpts}), + couch_db:close(Db). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -610,6 +640,7 @@ mem3_shards_test_() -> setup_all() -> ets:new(?SHARDS, [bag, public, named_table, {keypos, #shard.dbname}]), + ets:new(?OPTS, [set, public, named_table, {read_concurrency, true}, {write_concurrency, auto}]), ets:new(?OPENERS, [bag, public, named_table]), ets:new(?DBS, [set, public, named_table]), ets:new(?ATIMES, [ordered_set, public, named_table]), @@ -621,13 +652,15 @@ teardown_all(_) -> ets:delete(?ATIMES), ets:delete(?DBS), ets:delete(?OPENERS), - ets:delete(?SHARDS). + ets:delete(?SHARDS), + ets:delete(?OPTS). setup() -> ets:delete_all_objects(?ATIMES), ets:delete_all_objects(?DBS), ets:delete_all_objects(?OPENERS), - ets:delete_all_objects(?SHARDS). + ets:delete_all_objects(?SHARDS), + ets:delete_all_objects(?OPTS). teardown(_) -> ok. @@ -636,28 +669,30 @@ t_maybe_spawn_shard_writer_already_exists() -> ?_test(begin ets:insert(?OPENERS, {?DB, self()}), Shards = mock_shards(), - WRes = maybe_spawn_shard_writer(?DB, Shards, ?INFINITY), + WRes = maybe_spawn_shard_writer(?DB, [{x, y}], Shards, ?INFINITY), ?assertEqual(ignore, WRes) end). t_maybe_spawn_shard_writer_new() -> ?_test(begin Shards = mock_shards(), - WPid = maybe_spawn_shard_writer(?DB, Shards, 1000), + WPid = maybe_spawn_shard_writer(?DB, [{x, y}], Shards, 1000), WRef = erlang:monitor(process, WPid), ?assert(is_pid(WPid)), ?assert(is_process_alive(WPid)), WPid ! write, ?assertEqual(normal, wait_writer_result(WRef)), - ?assertEqual(Shards, ets:tab2list(?SHARDS)) + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{x, y}], ets:tab2list(?OPTS)) end). t_flush_writer_exists_normal() -> ?_test(begin Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), ?assertEqual(ok, flush_write(?DB, WPid, ?INFINITY)), - ?assertEqual(Shards, ets:tab2list(?SHARDS)) + ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{x, y}], ets:tab2list(?OPTS)) end). t_flush_writer_times_out() -> @@ -686,19 +721,20 @@ t_flush_writer_crashes() -> t_writer_deletes_itself_when_done() -> ?_test(begin Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), WRef = erlang:monitor(process, WPid), ets:insert(?OPENERS, {?DB, WPid}), WPid ! write, ?assertEqual(normal, wait_writer_result(WRef)), ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{x, y}], ets:tab2list(?OPTS)), ?assertEqual([], ets:tab2list(?OPENERS)) end). t_writer_does_not_delete_other_writers_for_same_shard() -> ?_test(begin Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), WRef = erlang:monitor(process, WPid), ets:insert(?OPENERS, {?DB, WPid}), % should not be deleted @@ -706,6 +742,7 @@ t_writer_does_not_delete_other_writers_for_same_shard() -> WPid ! write, ?assertEqual(normal, wait_writer_result(WRef)), ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{x, y}], ets:tab2list(?OPTS)), ?assertEqual(1, ets:info(?OPENERS, size)), ?assertEqual([{?DB, self()}], ets:tab2list(?OPENERS)) end). @@ -717,7 +754,7 @@ t_spawn_writer_in_load_shards_from_db() -> meck:expect(mem3_util, build_ordered_shards, 2, mock_shards()), % register to get cache_insert cast erlang:register(?MODULE, self()), - load_shards_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB), + load_from_db(test_util:fake_db([{name, <<"testdb">>}]), ?DB), meck:validate(couch_db), meck:validate(mem3_util), Cast = @@ -737,24 +774,26 @@ t_spawn_writer_in_load_shards_from_db() -> t_cache_insert_takes_new_update() -> ?_test(begin Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), Msg = {cache_insert, ?DB, WPid, 2}, {noreply, NewState} = handle_cast(Msg, mock_state(1)), ?assertMatch(#st{cur_size = 1}, NewState), ?assertEqual(Shards, ets:tab2list(?SHARDS)), + ?assertEqual([{x, y}], ets:tab2list(?OPTS)), ?assertEqual([], ets:tab2list(?OPENERS)) end). t_cache_insert_ignores_stale_update_and_kills_worker() -> ?_test(begin Shards = mock_shards(), - WPid = spawn_link_mock_writer(?DB, Shards, ?INFINITY), + WPid = spawn_link_mock_writer(?DB, [{x, y}], Shards, ?INFINITY), WRef = erlang:monitor(process, WPid), Msg = {cache_insert, ?DB, WPid, 1}, {noreply, NewState} = handle_cast(Msg, mock_state(2)), ?assertEqual(normal, wait_writer_result(WRef)), ?assertMatch(#st{cur_size = 0}, NewState), ?assertEqual([], ets:tab2list(?SHARDS)), + ?assertEqual([], ets:tab2list(?OPTS)), ?assertEqual([], ets:tab2list(?OPENERS)) end). @@ -784,8 +823,8 @@ wait_writer_result(WRef) -> timeout end. -spawn_link_mock_writer(Db, Shards, Timeout) -> - erlang:spawn_link(fun() -> shard_writer(Db, Shards, Timeout) end). +spawn_link_mock_writer(Db, DbOpts, Shards, Timeout) -> + erlang:spawn_link(fun() -> shard_writer(Db, DbOpts, Shards, Timeout) end). mem3_shards_changes_test_() -> { diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index aad333b28..f45ee4063 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -29,8 +29,7 @@ ]). -export([get_or_create_db/2, get_or_create_db_int/2]). -export([is_deleted/1, rotate_list/2]). --export([get_shard_opts/1, get_engine_opt/1, get_props_opt/1]). --export([get_shard_props/1, find_dirty_shards/0]). +-export([get_shard_opts/1]). -export([ iso8601_timestamp/0, live_nodes/0, @@ -230,8 +229,7 @@ build_shards_by_node(DbName, DocProps) -> #shard{ dbname = DbName, node = to_atom(Node), - range = [Beg, End], - opts = get_shard_opts(DocProps) + range = [Beg, End] }, Suffix ) @@ -257,8 +255,7 @@ build_shards_by_range(DbName, DocProps) -> dbname = DbName, node = to_atom(Node), range = [Beg, End], - order = Order, - opts = get_shard_opts(DocProps) + order = Order }, Suffix ) @@ -643,50 +640,6 @@ merge_opts(New, Old) -> New ). -get_shard_props(ShardName) -> - case couch_db:open_int(ShardName, []) of - {ok, Db} -> - Props = - case couch_db_engine:get_props(Db) of - undefined -> []; - Else -> Else - end, - %% We don't normally store the default engine name - EngineProps = - case couch_db_engine:get_engine(Db) of - couch_bt_engine -> - []; - EngineName -> - [{engine, EngineName}] - end, - [{props, Props} | EngineProps]; - {not_found, _} -> - not_found; - Else -> - Else - end. - -find_dirty_shards() -> - mem3_shards:fold( - fun(#shard{node = Node, name = Name, opts = Opts} = Shard, Acc) -> - case Opts of - [] -> - Acc; - [{props, []}] -> - Acc; - _ -> - Props = rpc:call(Node, ?MODULE, get_shard_props, [Name]), - case Props =:= Opts of - true -> - Acc; - false -> - [{Shard, Props} | Acc] - end - end - end, - [] - ). - -ifdef(TEST). -include_lib("eunit/include/eunit.hrl").