This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/main by this push: new 4c708e403 BTree engine term cache 4c708e403 is described below commit 4c708e403f52872dbe5fcea9c4f016f49f1bbe0a Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Mon Aug 18 18:30:29 2025 -0400 BTree engine term cache Cache the top b-tree nodes and header terms. In order to get or update any kv leaf nodes in the b-trees, we always have to read the top few kp nodes closer to the root. Even with parallel preads and data being in the page cache the cost of system calls, marshaling the term, going through the erlang IO system can add up. The cache is limited in size. The size defaults to 64MB and is configurable by the user. If the cache is full, no more entries will be added until more space frees up. Since multiple processes are accessing the data concurrently, ets tables are sharded by the number of schedulers, not unlike how we shard our couch_server processes. Each ets table has an associated cleaner process which evicts unused entries. Cleaners traverse table entries and "decay" the usage counters exponentially, by repeatedly shifting the value to the right with the `bsr` operator. Entries with a usage counter equal 0 are then removed. In order to only insert the top nodes in the cache the couch_btree module's lookup and streaming functions were updated with a `depth` parameter. In that way we avoid thrashing nodes which are less likely to be reused through the cache: if the depth > 3 or the node is a kv_node it skips the cache altogether. To get an idea on tuning the cache size for various workload there are three new metrics to count hits and misses: ``` % http $DB/_node/_local/_stats/couchdb/bt_engine_cache { "full": { "desc": "number of times bt_engine cache was full", "type": "counter", "value": 0 }, "hits": { "desc": "number of bt_engine cache hits", "type": "counter", "value": 233296 }, "misses": { "desc": "number of bt_engine cache misses", "type": "counter", "value": 7343 } } ``` Those are the metrics after running `fabric_bench:go(#{q=>8, doc_size=>small, docs=>100000})`. A quick and dirty comparison with main: ``` > fabric_bench:go(#{q=>8, doc_size=>small, docs=>100000}). *** Parameters * batch_size : 1000 * doc_size : small * docs : 100000 * individual_docs : 1000 * n : default * q : 8 *** Environment * Nodes : 1 * Bench ver. : 1 * N : 1 * Q : 8 * OS : unix/darwin * Couch ver. : 3.5.0-d16fc1f * Couch git sha: d16fc1f * VM details : Erlang/OTP 26 [erts-14.2.5.11] [source] [64-bit] [smp:12:12] [ds:12:12:16] [async-threads:1] *** Inserting 100000 docs * Add 100000 docs, ok:100/accepted:0 (Hz): 22000 * Get random doc 100000X (Hz): 3800 * All docs (Hz): 130000 * All docs w/ include_docs (Hz): 48000 * Changes (Hz): 16000 * Single doc updates 1000X (Hz): 530 * Time to run all benchmarks (sec): 40 ``` With the btree cache: ``` fabric_bench:go(#{q=>8, doc_size=>small, docs=>100000}). *** Parameters * batch_size : 1000 * doc_size : small * docs : 100000 * individual_docs : 1000 * n : default * q : 8 *** Environment * Nodes : 1 * Bench ver. : 1 * N : 1 * Q : 8 * OS : unix/darwin * Couch ver. : 3.5.0-d16fc1f * Couch git sha: d16fc1f * VM details : Erlang/OTP 26 [erts-14.2.5.11] [source] [64-bit] [smp:12:12] [ds:12:12:16] [async-threads:1] *** Inserting 100000 docs * Add 100000 docs, ok:100/accepted:0 (Hz): 24000 * Get random doc 100000X (Hz): 4400 * All docs (Hz): 140000 * All docs w/ include_docs (Hz): 49000 * Changes (Hz): 29000 * Single doc updates 1000X (Hz): 680 * Time to run all benchmarks (sec): 33 ``` The idea to use a depth parameter and an ets table came from Paul J. Davis (@davisp). --- rel/overlay/etc/default.ini | 20 ++ src/chttpd/src/chttpd_node.erl | 1 + src/couch/priv/stats_descriptions.cfg | 12 + src/couch/src/couch_bt_engine.erl | 46 ++- src/couch/src/couch_bt_engine_cache.erl | 292 +++++++++++++++++++ src/couch/src/couch_btree.erl | 317 ++++++++++++++------- src/couch/src/couch_primary_sup.erl | 3 +- .../test/eunit/couch_bt_engine_cache_test.erl | 102 +++++++ src/couch/test/eunit/couch_btree_tests.erl | 28 ++ src/couch_prometheus/src/couch_prometheus.erl | 12 +- 10 files changed, 723 insertions(+), 110 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index efdbf0c76..92405b01b 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -130,6 +130,26 @@ view_index_dir = {{view_index_dir}} ; downgrade or even open the databases in question. ;prohibit_downgrade = true +[bt_engine_cache] +; Memory used for btree engine cache. This is a cache for top levels of +; database btrees (id tree, seq tree) and a few terms from the db header. Value +; is in bytes. +;max_size = 67108864 +; +; Items not accessed in a while are eventually evicted. However, if the memory +; used is below this percentage, then even the unused items are left in the +; cache. The trade-off here is when a new workload starts, it may find the +; cache with some stale items during the first few seconds and not be able to +; insert its entries in. +;leave_percent = 30 +; +; Cache database btree nodes up to this depth only. Depth starts at 1 at root, +; then at 2 the next level down, and so on. Only intermediate (pointer) nodes +; will be cached, those are the nodes which point to other nodes, as opposed to +; leaf key-value nodes, which hold revision trees. To disable db btree node +; caching set the value to 0 +;db_btree_cache_depth = 3 + [purge] ; Allowed maximum number of documents in one purge request ;max_document_id_number = 100 diff --git a/src/chttpd/src/chttpd_node.erl b/src/chttpd/src/chttpd_node.erl index 2a67da9e3..9a567f4db 100644 --- a/src/chttpd/src/chttpd_node.erl +++ b/src/chttpd/src/chttpd_node.erl @@ -286,6 +286,7 @@ get_stats() -> {run_queue, SQ}, {run_queue_dirty_cpu, DCQ}, {ets_table_count, length(ets:all())}, + {bt_engine_cache, couch_bt_engine_cache:info()}, {context_switches, element(1, statistics(context_switches))}, {reductions, element(1, statistics(reductions))}, {garbage_collection_count, NumberOfGCs}, diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index 6a7120f87..18f32a47a 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -474,3 +474,15 @@ {type, counter}, {desc, <<"number of mango selector evaluations">>} ]}. +{[couchdb, bt_engine_cache, hits], [ + {type, counter}, + {desc, <<"number of bt_engine cache hits">>} +]}. +{[couchdb, bt_engine_cache, misses], [ + {type, counter}, + {desc, <<"number of bt_engine cache misses">>} +]}. +{[couchdb, bt_engine_cache, full], [ + {type, counter}, + {desc, <<"number of times bt_engine cache was full">>} +]}. diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 180c99e36..54a03978a 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -122,6 +122,11 @@ -define(PURGE_INFOS_LIMIT, purge_infos_limit). -define(COMPACTED_SEQ, compacted_seq). +-define(DEFAULT_BTREE_CACHE_DEPTH, 3). +% Priority is about how long the entry will survive in the cache initially. A +% period is about 2 seconds and each period the value is halved. +-define(HEADER_CACHE_PRIORITY, 16). + exists(FilePath) -> case is_file(FilePath) of true -> @@ -828,7 +833,8 @@ init_state(FilePath, Fd, Header0, Options) -> {split, fun ?MODULE:id_tree_split/1}, {join, fun ?MODULE:id_tree_join/2}, {reduce, fun ?MODULE:id_tree_reduce/2}, - {compression, Compression} + {compression, Compression}, + {cache_depth, btree_cache_depth()} ]), SeqTreeState = couch_bt_engine_header:seq_tree_state(Header), @@ -836,28 +842,32 @@ init_state(FilePath, Fd, Header0, Options) -> {split, fun ?MODULE:seq_tree_split/1}, {join, fun ?MODULE:seq_tree_join/2}, {reduce, fun ?MODULE:seq_tree_reduce/2}, - {compression, Compression} + {compression, Compression}, + {cache_depth, btree_cache_depth()} ]), LocalTreeState = couch_bt_engine_header:local_tree_state(Header), {ok, LocalTree} = couch_btree:open(LocalTreeState, Fd, [ {split, fun ?MODULE:local_tree_split/1}, {join, fun ?MODULE:local_tree_join/2}, - {compression, Compression} + {compression, Compression}, + {cache_depth, btree_cache_depth()} ]), PurgeTreeState = couch_bt_engine_header:purge_tree_state(Header), {ok, PurgeTree} = couch_btree:open(PurgeTreeState, Fd, [ {split, fun ?MODULE:purge_tree_split/1}, {join, fun ?MODULE:purge_tree_join/2}, - {reduce, fun ?MODULE:purge_tree_reduce/2} + {reduce, fun ?MODULE:purge_tree_reduce/2}, + {cache_depth, btree_cache_depth()} ]), PurgeSeqTreeState = couch_bt_engine_header:purge_seq_tree_state(Header), {ok, PurgeSeqTree} = couch_btree:open(PurgeSeqTreeState, Fd, [ {split, fun ?MODULE:purge_seq_tree_split/1}, {join, fun ?MODULE:purge_seq_tree_join/2}, - {reduce, fun ?MODULE:purge_tree_reduce/2} + {reduce, fun ?MODULE:purge_tree_reduce/2}, + {cache_depth, btree_cache_depth()} ]), ok = couch_file:set_db_pid(Fd, self()), @@ -1195,8 +1205,14 @@ get_header_term(#st{header = Header} = St, Key, Default) when is_atom(Key) -> undefined -> Default; Pointer when is_integer(Pointer) -> - {ok, Term} = couch_file:pread_term(St#st.fd, Pointer), - Term + case couch_bt_engine_cache:lookup({St#st.fd, Pointer}) of + undefined -> + {ok, Term} = couch_file:pread_term(St#st.fd, Pointer), + couch_bt_engine_cache:insert({St#st.fd, Pointer}, Term, ?HEADER_CACHE_PRIORITY), + Term; + Term -> + Term + end end. set_header_term(#st{} = St, Key, Term) when is_atom(Key) -> @@ -1207,6 +1223,20 @@ set_header_term(#st{} = St, Key, Term) when is_atom(Key) -> }. set_header_term(Fd, Header, Key, Term, Compression) when is_atom(Key) -> + case couch_bt_engine_header:get(Header, Key) of + Pointer when is_integer(Pointer) -> + % Reset old one to 0 usage. Some old snapshot may still + % see it and use. But it will only survive only one more + % interval at most otherwise + couch_bt_engine_cache:reset({Fd, Pointer}); + _ -> + ok + end, TermOpts = [{compression, Compression}], {ok, Ptr, _} = couch_file:append_term(Fd, Term, TermOpts), - couch_bt_engine_header:set(Header, Key, Ptr). + Result = couch_bt_engine_header:set(Header, Key, Ptr), + couch_bt_engine_cache:insert({Fd, Ptr}, Term, ?HEADER_CACHE_PRIORITY), + Result. + +btree_cache_depth() -> + config:get_integer("bt_engine_cache", "db_btree_cache_depth", ?DEFAULT_BTREE_CACHE_DEPTH). diff --git a/src/couch/src/couch_bt_engine_cache.erl b/src/couch/src/couch_bt_engine_cache.erl new file mode 100644 index 000000000..a39564957 --- /dev/null +++ b/src/couch/src/couch_bt_engine_cache.erl @@ -0,0 +1,292 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_bt_engine_cache). + +-include_lib("stdlib/include/ms_transform.hrl"). + +% Main API +% +-export([ + insert/2, + insert/3, + reset/1, + lookup/1, + info/0, + tables/0 +]). + +% Supervision and start API +% +-export([ + create_tables/0, + sup_children/0, + start_link/1, + init/1 +]). + +-define(DEFAULT_SIZE, 67108864). +-define(DEFAULT_LEAVE_PERCENT, 30). +-define(INTERVAL_MSEC, 3000). +% How often cleaners check if the ets size increases. This is used in cases +% when initially, at the start the cleaning interval, ets table size is below +% "leave percent". Then we skip cleaning all the 0 usage entries. However, if a +% new set of requests come in soon after, but before the next clean-up interval +% they'll fail since the cache would be full. To be able to react quicker and +% make room, cleaners poll table sizes a bit more often. +-define(CLEANUP_INTERVAL_MSEC, 100). +% 1 bsl 58, power of 2 that's still an immediate integer +-define(MAX_PRIORITY, 288230376151711744). +-define(PTERM_KEY, {?MODULE, caches}). +% Metrics +-define(HITS, hits). +-define(MISSES, misses). +-define(FULL, full). + +-record(cache, {tid, max_size}). + +% Main API + +insert(Key, Term) -> + insert(Key, Term, 1). + +insert(Key, Term, Priority) when is_integer(Priority) -> + Priority1 = min(?MAX_PRIORITY, max(0, Priority)), + case get_cache(Key) of + #cache{tid = Tid, max_size = Max} -> + case ets:info(Tid, memory) < Max of + true -> + case ets:insert_new(Tid, {Key, Priority1, Term}) of + true -> + true; + false -> + bump_usage(Tid, Key), + false + end; + false -> + bump_metric(?FULL), + false + end; + undefined -> + false + end. + +reset(Key) -> + case get_cache(Key) of + #cache{tid = Tid} -> reset_usage(Tid, Key); + undefined -> true + end. + +lookup(Key) -> + case get_cache(Key) of + #cache{tid = Tid} -> + case ets:lookup_element(Tid, Key, 3, undefined) of + undefined -> + bump_metric(?MISSES), + undefined; + Term -> + bump_usage(Tid, Key), + bump_metric(?HITS), + Term + end; + undefined -> + undefined + end. + +info() -> + case persistent_term:get(?PTERM_KEY, undefined) of + Caches when is_tuple(Caches) -> + SizeMem = [info(C) || C <- tuple_to_list(Caches)], + MaxMem = [Max || #cache{max_size = Max} <- tuple_to_list(Caches)], + {Sizes, Mem} = lists:unzip(SizeMem), + #{ + size => lists:sum(Sizes), + memory => lists:sum(Mem), + max_memory => lists:sum(MaxMem) * wordsize(), + full => sample_metric(?FULL), + hits => sample_metric(?HITS), + misses => sample_metric(?MISSES), + shard_count => shard_count() + }; + undefined -> + #{} + end. + +tables() -> + case persistent_term:get(?PTERM_KEY, undefined) of + Caches when is_tuple(Caches) -> + [Tid || #cache{tid = Tid} <- tuple_to_list(Caches)]; + undefined -> + [] + end. + +% Supervisor helper functions + +create_tables() -> + BtCaches = [new() || _ <- lists:seq(1, shard_count())], + persistent_term:put(?PTERM_KEY, list_to_tuple(BtCaches)). + +sup_children() -> + [sup_child(I) || I <- lists:seq(1, shard_count())]. + +sup_child(N) -> + Name = list_to_atom("couch_bt_engine_cache_" ++ integer_to_list(N)), + #{id => Name, start => {?MODULE, start_link, [N]}, shutdown => brutal_kill}. + +% Process start and main loop + +start_link(N) when is_integer(N) -> + {ok, proc_lib:spawn_link(?MODULE, init, [N])}. + +init(N) -> + Caches = persistent_term:get(?PTERM_KEY), + Cache = #cache{tid = Tid} = element(N, Caches), + ets:delete_all_objects(Tid), + loop(Cache). + +loop(#cache{tid = Tid} = Cache) -> + decay(Tid), + Next = now_msec() + wait_interval(?INTERVAL_MSEC), + clean(Cache, Next - ?CLEANUP_INTERVAL_MSEC), + remove_dead(Cache), + WaitLeft = max(10, Next - now_msec()), + timer:sleep(WaitLeft), + loop(Cache). + +% Clean unused procs. If we haven't cleaned any keep polling at a higher +% rate so we react quicker if a new set of entries are added. +% +clean(#cache{tid = Tid} = Cache, Until) -> + case now_msec() < Until of + true -> + case should_clean(Cache) of + true -> + ets:match_delete(Tid, {'_', 0, '_'}); + false -> + timer:sleep(wait_interval(?CLEANUP_INTERVAL_MSEC)), + clean(Cache, Until) + end; + false -> + false + end. + +should_clean(#cache{tid = Tid, max_size = Max}) -> + ets:info(Tid, memory) >= Max * leave_percent() / 100. + +remove_dead(#cache{tid = Tid}) -> + All = pids(Tid), + Alive = sets:filter(fun is_process_alive/1, All), + Dead = sets:subtract(All, Alive), + % In OTP 27+ use sets:map/2 + Fun = fun(Pid, _) -> ets:match_delete(Tid, {{Pid, '_'}, '_', '_'}) end, + sets:fold(Fun, true, Dead). + +pids(Tid) -> + Acc = couch_util:new_set(), + try + ets:foldl(fun pids_fold/2, Acc, Tid) + catch + error:badarg -> Acc + end. + +pids_fold({{Pid, _}, _, _}, Acc) when is_pid(Pid) -> + sets:add_element(Pid, Acc); +pids_fold({_, _, _}, Acc) -> + Acc. + +new() -> + Opts = [public, {write_concurrency, true}, {read_concurrency, true}], + Max0 = round(max_size() / wordsize() / shard_count()), + % Some per-table overhead for the table metadata + Max = Max0 + round(250 * 1024 / wordsize()), + #cache{tid = ets:new(?MODULE, Opts), max_size = Max}. + +get_cache(Term) -> + case persistent_term:get(?PTERM_KEY, undefined) of + Caches when is_tuple(Caches) -> + Index = erlang:phash2(Term, tuple_size(Caches)), + #cache{} = element(1 + Index, Caches); + undefined -> + undefined + end. + +bump_usage(Tid, Key) -> + % We're updating the second field incrementing it by 1 and clamping it + % at ?MAX_PRIORITY. We don't set the default for the update_counter + % specifically to avoid creating bogus entries just from updating the + % counter, so expect the error:badarg here sometimes. + UpdateOp = {2, 1, ?MAX_PRIORITY, ?MAX_PRIORITY}, + try + ets:update_counter(Tid, Key, UpdateOp) + catch + error:badarg -> ok + end. + +reset_usage(Tid, Key) -> + % Reset the value of the usage to 0. Since max value is ?MAX_PRIORITY, + % subtract that and clamp it at 0. Do not provide a default since if an + % entry is missing we don't want to create a bogus one from this operation. + UpdateOp = {2, -?MAX_PRIORITY, 0, 0}, + try + ets:update_counter(Tid, Key, UpdateOp) + catch + error:badarg -> ok + end. + +info(#cache{tid = Tid}) -> + Memory = ets:info(Tid, memory) * wordsize(), + Size = ets:info(Tid, size), + {Size, Memory}. + +decay(Tid) -> + MatchSpec = ets:fun2ms( + fun({Key, Usage, Term}) when Usage > 0 -> + {Key, Usage bsr 1, Term} + end + ), + ets:select_replace(Tid, MatchSpec). + +shard_count() -> + % Use a minimum size of 16 even for there are less than 16 schedulers + % to keep the total tables size a bit smaller + max(16, erlang:system_info(schedulers)). + +wait_interval(Interval) -> + Jitter = rand:uniform(max(1, Interval bsr 1)), + Interval + Jitter. + +max_size() -> + config:get_integer("bt_engine_cache", "max_size", ?DEFAULT_SIZE). + +leave_percent() -> + Val = config:get_integer("bt_engine_cache", "leave_percent", ?DEFAULT_LEAVE_PERCENT), + max(0, min(90, Val)). + +now_msec() -> + erlang:monotonic_time(millisecond). + +bump_metric(Metric) when is_atom(Metric) -> + couch_stats:increment_counter([couchdb, bt_engine_cache, Metric]). + +sample_metric(Metric) when is_atom(Metric) -> + try + couch_stats:sample([couchdb, bt_engine_cache, Metric]) + catch + throw:unknown_metric -> + 0 + end. + +% ETS sizes are expressed in "words". To get the byte size need to multiply +% memory sizes by the wordsize. On 64 bit systems this should be 8 +% +wordsize() -> + erlang:system_info(wordsize). diff --git a/src/couch/src/couch_btree.erl b/src/couch/src/couch_btree.erl index 788b5f120..f8c8ab675 100644 --- a/src/couch/src/couch_btree.erl +++ b/src/couch/src/couch_btree.erl @@ -20,6 +20,11 @@ -include_lib("couch/include/couch_db.hrl"). +% For the btree cache, the priority of the root node will be +% this value. The priority is roughly how many cleanup interval +% (second) they'll survive without any updates in the cache +-define(ROOT_NODE_CACHE_PRIORITY, 8). + -record(btree, { fd, root, @@ -27,7 +32,8 @@ assemble_kv, less, reduce = nil, - compression = ?DEFAULT_COMPRESSION + compression = ?DEFAULT_COMPRESSION, + cache_depth = 0 }). -define(FILL_RATIO, 0.5). @@ -62,7 +68,9 @@ set_options(Bt, [{less, Less} | Rest]) -> set_options(Bt, [{reduce, Reduce} | Rest]) -> set_options(Bt#btree{reduce = Reduce}, Rest); set_options(Bt, [{compression, Comp} | Rest]) -> - set_options(Bt#btree{compression = Comp}, Rest). + set_options(Bt#btree{compression = Comp}, Rest); +set_options(Bt, [{cache_depth, Depth} | Rest]) when is_integer(Depth) -> + set_options(Bt#btree{cache_depth = Depth}, Rest). open(State, Fd, Options) -> {ok, set_options(#btree{root = State, fd = Fd}, Options)}. @@ -111,7 +119,8 @@ fold_reduce(#btree{root = Root} = Bt, Fun, Acc, Options) -> [], KeyGroupFun, Fun, - Acc + Acc, + 0 ), if GroupedKey2 == undefined -> @@ -258,7 +267,8 @@ fold(#btree{root = Root} = Bt, Fun, Acc, Options) -> InRange, Dir, convert_fun_arity(Fun), - Acc + Acc, + 0 ); StartKey -> stream_node( @@ -269,7 +279,8 @@ fold(#btree{root = Root} = Bt, Fun, Acc, Options) -> InRange, Dir, convert_fun_arity(Fun), - Acc + Acc, + 0 ) end, case Result of @@ -307,7 +318,7 @@ query_modify(Bt, LookupKeys, InsertValues, RemoveKeys) -> end end, Actions = lists:sort(SortFun, lists:append([InsertActions, RemoveActions, FetchActions])), - {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, []), + {ok, KeyPointers, QueryResults} = modify_node(Bt, Root, Actions, [], 0), {ok, NewRoot} = complete_root(Bt, KeyPointers), {ok, QueryResults, Bt#btree{root = NewRoot}}. @@ -323,64 +334,87 @@ lookup(#btree{root = Root, less = Less} = Bt, Keys) -> undefined -> lists:sort(Keys); _ -> lists:sort(Less, Keys) end, - {ok, SortedResults} = lookup(Bt, Root, SortedKeys), + {ok, SortedResults} = lookup(Bt, Root, SortedKeys, 0), % We want to return the results in the same order as the keys were input % but we may have changed the order when we sorted. So we need to put the % order back into the results. couch_util:reorder_results(Keys, SortedResults). -lookup(_Bt, nil, Keys) -> +lookup(_Bt, nil, Keys, _Depth) -> {ok, [{Key, not_found} || Key <- Keys]}; -lookup(Bt, Node, Keys) -> +lookup(Bt, Node, Keys, Depth0) -> + Depth = Depth0 + 1, Pointer = element(1, Node), - {NodeType, NodeList} = get_node(Bt, Pointer), + {NodeType, NodeList} = get_node(Bt, Pointer, Depth), case NodeType of kp_node -> - lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, []); + lookup_kpnode(Bt, list_to_tuple(NodeList), 1, Keys, [], Depth); kv_node -> - lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, []) + lookup_kvnode(Bt, list_to_tuple(NodeList), 1, Keys, [], Depth) end. -lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> +lookup_kpnode(_Bt, _NodeTuple, _LowerBound, [], Output, _Depth) -> {ok, lists:reverse(Output)}; -lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> +lookup_kpnode(_Bt, NodeTuple, LowerBound, Keys, Output, _Depth) when + tuple_size(NodeTuple) < LowerBound +-> {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; -lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output) -> +lookup_kpnode(Bt, NodeTuple, LowerBound, [FirstLookupKey | _] = LookupKeys, Output, Depth) -> N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), FirstLookupKey), {Key, PointerInfo} = element(N, NodeTuple), SplitFun = fun(LookupKey) -> not less(Bt, Key, LookupKey) end, case lists:splitwith(SplitFun, LookupKeys) of {[], GreaterQueries} -> - lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output); + lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, Output, Depth); {LessEqQueries, GreaterQueries} -> - {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries), - lookup_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output)) + {ok, Results} = lookup(Bt, PointerInfo, LessEqQueries, Depth), + lookup_kpnode( + Bt, NodeTuple, N + 1, GreaterQueries, lists:reverse(Results, Output), Depth + ) end. -lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output) -> +lookup_kvnode(_Bt, _NodeTuple, _LowerBound, [], Output, _Depth) -> {ok, lists:reverse(Output)}; -lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output) when tuple_size(NodeTuple) < LowerBound -> +lookup_kvnode(_Bt, NodeTuple, LowerBound, Keys, Output, _Depth) when + tuple_size(NodeTuple) < LowerBound +-> % keys not found {ok, lists:reverse(Output, [{Key, not_found} || Key <- Keys])}; -lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output) -> +lookup_kvnode(Bt, NodeTuple, LowerBound, [LookupKey | RestLookupKeys], Output, Depth) -> N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), LookupKey), {Key, Value} = element(N, NodeTuple), case less(Bt, LookupKey, Key) of true -> % LookupKey is less than Key - lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output]); + lookup_kvnode( + Bt, NodeTuple, N, RestLookupKeys, [{LookupKey, not_found} | Output], Depth + ); false -> case less(Bt, Key, LookupKey) of true -> % LookupKey is greater than Key - lookup_kvnode(Bt, NodeTuple, N + 1, RestLookupKeys, [ - {LookupKey, not_found} | Output - ]); + lookup_kvnode( + Bt, + NodeTuple, + N + 1, + RestLookupKeys, + [ + {LookupKey, not_found} | Output + ], + Depth + ); false -> % LookupKey is equal to Key - lookup_kvnode(Bt, NodeTuple, N, RestLookupKeys, [ - {LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output - ]) + lookup_kvnode( + Bt, + NodeTuple, + N, + RestLookupKeys, + [ + {LookupKey, {ok, assemble(Bt, LookupKey, Value)}} | Output + ], + Depth + ) end end. @@ -436,25 +470,27 @@ get_chunk_size() -> 1279 end. -modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> +modify_node(Bt, RootPointerInfo, Actions, QueryOutput, Depth0) -> + Depth = Depth0 + 1, {NodeType, NodeList} = case RootPointerInfo of nil -> {kv_node, []}; _Tuple -> Pointer = element(1, RootPointerInfo), - get_node(Bt, Pointer) + get_node(Bt, Pointer, Depth) end, NodeTuple = list_to_tuple(NodeList), {ok, NewNodeList, QueryOutput2} = case NodeType of - kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput); - kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput) + kp_node -> modify_kpnode(Bt, NodeTuple, 1, Actions, [], QueryOutput, Depth); + kv_node -> modify_kvnode(Bt, NodeTuple, 1, Actions, [], QueryOutput, Depth) end, case NewNodeList of % no nodes remain [] -> + reset_cache_usage(Bt, RootPointerInfo, Depth), {ok, [], QueryOutput2}; % nothing changed NodeList -> @@ -466,6 +502,7 @@ modify_node(Bt, RootPointerInfo, Actions, QueryOutput) -> nil -> write_node(Bt, NodeType, NewNodeList); _ -> + reset_cache_usage(Bt, RootPointerInfo, Depth), {LastKey, _LastValue} = element(tuple_size(NodeTuple), NodeTuple), OldNode = {LastKey, RootPointerInfo}, write_node(Bt, OldNode, NodeType, NodeList, NewNodeList) @@ -492,7 +529,30 @@ reduce_tree_size(kp_node, _NodeSize, [{_K, {_P, _Red, nil}} | _]) -> reduce_tree_size(kp_node, NodeSize, [{_K, {_P, _Red, Sz}} | NodeList]) -> reduce_tree_size(kp_node, NodeSize + Sz, NodeList). -get_node(#btree{fd = Fd}, NodePos) -> +reset_cache_usage(_, nil, _Depth) -> + ok; +reset_cache_usage(#btree{cache_depth = Max}, _, Depth) when Depth > Max -> + ok; +reset_cache_usage(#btree{fd = Fd}, RootPointerInfo, _Depth) -> + Pointer = element(1, RootPointerInfo), + couch_bt_engine_cache:reset({Fd, Pointer}). + +get_node(#btree{fd = Fd, cache_depth = Max}, NodePos, Depth) when Depth =< Max -> + case couch_bt_engine_cache:lookup({Fd, NodePos}) of + undefined -> + {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), + case NodeType of + kp_node -> + Priority = max(1, ?ROOT_NODE_CACHE_PRIORITY - Depth), + couch_bt_engine_cache:insert({Fd, NodePos}, NodeList, Priority); + kv_node -> + ok + end, + {NodeType, NodeList}; + NodeList -> + {kp_node, NodeList} + end; +get_node(#btree{fd = Fd}, NodePos, _Depth) -> {ok, {NodeType, NodeList}} = couch_file:pread_term(Fd, NodePos), {NodeType, NodeList}. @@ -568,9 +628,9 @@ old_list_is_prefix([KV | Rest1], [KV | Rest2], Acc) -> old_list_is_prefix(_OldList, _NewList, _Acc) -> false. -modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput) -> - modify_node(Bt, nil, Actions, QueryOutput); -modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> +modify_kpnode(Bt, {}, _LowerBound, Actions, [], QueryOutput, Depth) -> + modify_node(Bt, nil, Actions, QueryOutput, Depth); +modify_kpnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput, _Depth) -> {ok, lists:reverse( ResultNode, @@ -588,7 +648,8 @@ modify_kpnode( LowerBound, [{_, FirstActionKey, _} | _] = Actions, ResultNode, - QueryOutput + QueryOutput, + Depth ) -> Sz = tuple_size(NodeTuple), N = find_first_gteq(Bt, NodeTuple, LowerBound, Sz, FirstActionKey), @@ -597,7 +658,7 @@ modify_kpnode( % perform remaining actions on last node {_, PointerInfo} = element(Sz, NodeTuple), {ok, ChildKPs, QueryOutput2} = - modify_node(Bt, PointerInfo, Actions, QueryOutput), + modify_node(Bt, PointerInfo, Actions, QueryOutput, Depth), NodeList = lists:reverse( ResultNode, bounded_tuple_to_list( @@ -615,7 +676,7 @@ modify_kpnode( end, {LessEqQueries, GreaterQueries} = lists:splitwith(SplitFun, Actions), {ok, ChildKPs, QueryOutput2} = - modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput), + modify_node(Bt, PointerInfo, LessEqQueries, QueryOutput, Depth), ResultNode2 = lists:reverse( ChildKPs, bounded_tuple_to_revlist( @@ -625,7 +686,7 @@ modify_kpnode( ResultNode ) ), - modify_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, ResultNode2, QueryOutput2) + modify_kpnode(Bt, NodeTuple, N + 1, GreaterQueries, ResultNode2, QueryOutput2, Depth) end. bounded_tuple_to_revlist(_Tuple, Start, End, Tail) when Start > End -> @@ -653,7 +714,7 @@ find_first_gteq(Bt, Tuple, Start, End, Key) -> find_first_gteq(Bt, Tuple, Start, Mid, Key) end. -modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput) -> +modify_kvnode(_Bt, NodeTuple, LowerBound, [], ResultNode, QueryOutput, _Depth) -> {ok, lists:reverse( ResultNode, bounded_tuple_to_list(NodeTuple, LowerBound, tuple_size(NodeTuple), []) @@ -665,7 +726,8 @@ modify_kvnode( LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], ResultNode, - QueryOutput + QueryOutput, + Depth ) when LowerBound > tuple_size(NodeTuple) -> case ActionType of insert -> @@ -675,16 +737,25 @@ modify_kvnode( LowerBound, RestActions, [{ActionKey, ActionValue} | ResultNode], - QueryOutput + QueryOutput, + Depth ); remove -> % just drop the action - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput); + modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, QueryOutput, Depth); fetch -> % the key/value must not exist in the tree - modify_kvnode(Bt, NodeTuple, LowerBound, RestActions, ResultNode, [ - {not_found, {ActionKey, nil}} | QueryOutput - ]) + modify_kvnode( + Bt, + NodeTuple, + LowerBound, + RestActions, + ResultNode, + [ + {not_found, {ActionKey, nil}} | QueryOutput + ], + Depth + ) end; modify_kvnode( Bt, @@ -692,7 +763,8 @@ modify_kvnode( LowerBound, [{ActionType, ActionKey, ActionValue} | RestActions], AccNode, - QueryOutput + QueryOutput, + Depth ) -> N = find_first_gteq(Bt, NodeTuple, LowerBound, tuple_size(NodeTuple), ActionKey), {Key, Value} = element(N, NodeTuple), @@ -708,16 +780,25 @@ modify_kvnode( N, RestActions, [{ActionKey, ActionValue} | ResultNode], - QueryOutput + QueryOutput, + Depth ); remove -> % ActionKey is less than the Key, just drop the action - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput); + modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, QueryOutput, Depth); fetch -> % ActionKey is less than the Key, the key/value must not exist in the tree - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [ - {not_found, {ActionKey, nil}} | QueryOutput - ]) + modify_kvnode( + Bt, + NodeTuple, + N, + RestActions, + ResultNode, + [ + {not_found, {ActionKey, nil}} | QueryOutput + ], + Depth + ) end; false -> % ActionKey and Key are maybe equal. @@ -731,18 +812,27 @@ modify_kvnode( N + 1, RestActions, [{ActionKey, ActionValue} | ResultNode], - QueryOutput + QueryOutput, + Depth ); remove -> modify_kvnode( - Bt, NodeTuple, N + 1, RestActions, ResultNode, QueryOutput + Bt, NodeTuple, N + 1, RestActions, ResultNode, QueryOutput, Depth ); fetch -> % ActionKey is equal to the Key, insert into the QueryOuput, but re-process the node % since an identical action key can follow it. - modify_kvnode(Bt, NodeTuple, N, RestActions, ResultNode, [ - {ok, assemble(Bt, Key, Value)} | QueryOutput - ]) + modify_kvnode( + Bt, + NodeTuple, + N, + RestActions, + ResultNode, + [ + {ok, assemble(Bt, Key, Value)} | QueryOutput + ], + Depth + ) end; true -> modify_kvnode( @@ -751,7 +841,8 @@ modify_kvnode( N + 1, [{ActionType, ActionKey, ActionValue} | RestActions], [{Key, Value} | ResultNode], - QueryOutput + QueryOutput, + Depth ) end end. @@ -767,7 +858,8 @@ reduce_stream_node( GroupedRedsAcc, _KeyGroupFun, _Fun, - Acc + Acc, + _Depth ) -> {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; reduce_stream_node( @@ -781,10 +873,12 @@ reduce_stream_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth0 ) -> + Depth = Depth0 + 1, P = element(1, Node), - case get_node(Bt, P) of + case get_node(Bt, P, Depth) of {kp_node, NodeList} -> NodeList2 = adjust_dir(Dir, NodeList), reduce_stream_kp_node( @@ -798,7 +892,8 @@ reduce_stream_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ); {kv_node, KVs} -> KVs2 = adjust_dir(Dir, KVs), @@ -813,7 +908,8 @@ reduce_stream_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ) end. @@ -828,7 +924,8 @@ reduce_stream_kv_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ) -> GTEKeyStartKVs = case KeyStart of @@ -855,7 +952,8 @@ reduce_stream_kv_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ). reduce_stream_kv_node2( @@ -866,7 +964,8 @@ reduce_stream_kv_node2( GroupedRedsAcc, _KeyGroupFun, _Fun, - Acc + Acc, + _Depth ) -> {ok, Acc, GroupedRedsAcc, GroupedKVsAcc, GroupedKey}; reduce_stream_kv_node2( @@ -877,7 +976,8 @@ reduce_stream_kv_node2( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ) -> case GroupedKey of undefined -> @@ -889,7 +989,8 @@ reduce_stream_kv_node2( [], KeyGroupFun, Fun, - Acc + Acc, + Depth ); _ -> case KeyGroupFun(GroupedKey, Key) of @@ -902,7 +1003,8 @@ reduce_stream_kv_node2( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ); false -> case Fun(GroupedKey, {GroupedKVsAcc, GroupedRedsAcc}, Acc) of @@ -915,7 +1017,8 @@ reduce_stream_kv_node2( [], KeyGroupFun, Fun, - Acc2 + Acc2, + Depth ); {stop, Acc2} -> throw({stop, Acc2}) @@ -934,7 +1037,8 @@ reduce_stream_kp_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ) -> Nodes = case KeyStart of @@ -977,7 +1081,8 @@ reduce_stream_kp_node( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ). reduce_stream_kp_node2( @@ -991,7 +1096,8 @@ reduce_stream_kp_node2( [], KeyGroupFun, Fun, - Acc + Acc, + Depth ) -> {ok, Acc2, GroupedRedsAcc2, GroupedKVsAcc2, GroupedKey2} = reduce_stream_node( @@ -1005,7 +1111,8 @@ reduce_stream_kp_node2( [], KeyGroupFun, Fun, - Acc + Acc, + Depth ), reduce_stream_kp_node2( Bt, @@ -1018,7 +1125,8 @@ reduce_stream_kp_node2( GroupedRedsAcc2, KeyGroupFun, Fun, - Acc2 + Acc2, + Depth ); reduce_stream_kp_node2( Bt, @@ -1031,7 +1139,8 @@ reduce_stream_kp_node2( GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ) -> {Grouped0, Ungrouped0} = lists:splitwith( fun({Key, _}) -> @@ -1062,7 +1171,8 @@ reduce_stream_kp_node2( GroupedReds ++ GroupedRedsAcc, KeyGroupFun, Fun, - Acc + Acc, + Depth ), reduce_stream_kp_node2( Bt, @@ -1075,7 +1185,8 @@ reduce_stream_kp_node2( GroupedRedsAcc2, KeyGroupFun, Fun, - Acc2 + Acc2, + Depth ); [] -> {ok, Acc, GroupedReds ++ GroupedRedsAcc, GroupedKVsAcc, GroupedKey} @@ -1086,40 +1197,46 @@ adjust_dir(fwd, List) -> adjust_dir(rev, List) -> lists:reverse(List). -stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc) -> +stream_node(Bt, Reds, Node, StartKey, InRange, Dir, Fun, Acc, Depth0) -> + Depth = Depth0 + 1, Pointer = element(1, Node), - {NodeType, NodeList} = get_node(Bt, Pointer), + {NodeType, NodeList} = get_node(Bt, Pointer, Depth), case NodeType of kp_node -> - stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc); + stream_kp_node( + Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc, Depth + ); kv_node -> - stream_kv_node(Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc) + stream_kv_node( + Bt, Reds, adjust_dir(Dir, NodeList), StartKey, InRange, Dir, Fun, Acc, Depth + ) end. -stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc) -> +stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc, Depth0) -> + Depth = Depth0 + 1, Pointer = element(1, Node), - {NodeType, NodeList} = get_node(Bt, Pointer), + {NodeType, NodeList} = get_node(Bt, Pointer, Depth), case NodeType of kp_node -> - stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc); + stream_kp_node(Bt, Reds, adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc, Depth); kv_node -> - stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc) + stream_kv_node2(Bt, Reds, [], adjust_dir(Dir, NodeList), InRange, Dir, Fun, Acc, Depth) end. -stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc) -> +stream_kp_node(_Bt, _Reds, [], _InRange, _Dir, _Fun, Acc, _Depth) -> {ok, Acc}; -stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc) -> +stream_kp_node(Bt, Reds, [{Key, Node} | Rest], InRange, Dir, Fun, Acc, Depth) -> Red = element(2, Node), case Fun(traverse, Key, Red, Acc) of {ok, Acc2} -> - case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2) of + case stream_node(Bt, Reds, Node, InRange, Dir, Fun, Acc2, Depth) of {ok, Acc3} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3); + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc3, Depth); {stop, LastReds, Acc3} -> {stop, LastReds, Acc3} end; {skip, Acc2} -> - stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2); + stream_kp_node(Bt, [Red | Reds], Rest, InRange, Dir, Fun, Acc2, Depth); {stop, Acc2} -> {stop, Reds, Acc2} end. @@ -1134,7 +1251,7 @@ drop_nodes(Bt, Reds, StartKey, [{NodeKey, Node} | RestKPs]) -> {Reds, [{NodeKey, Node} | RestKPs]} end. -stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) -> +stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc, Depth) -> {NewReds, NodesToStream} = case Dir of fwd -> @@ -1157,16 +1274,16 @@ stream_kp_node(Bt, Reds, KPs, StartKey, InRange, Dir, Fun, Acc) -> [] -> {ok, Acc}; [{_Key, Node} | Rest] -> - case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc) of + case stream_node(Bt, NewReds, Node, StartKey, InRange, Dir, Fun, Acc, Depth) of {ok, Acc2} -> Red = element(2, Node), - stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2); + stream_kp_node(Bt, [Red | NewReds], Rest, InRange, Dir, Fun, Acc2, Depth); {stop, LastReds, Acc2} -> {stop, LastReds, Acc2} end end. -stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) -> +stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc, Depth) -> DropFun = case Dir of fwd -> @@ -1176,11 +1293,11 @@ stream_kv_node(Bt, Reds, KVs, StartKey, InRange, Dir, Fun, Acc) -> end, {LTKVs, GTEKVs} = lists:splitwith(DropFun, KVs), AssembleLTKVs = [assemble(Bt, K, V) || {K, V} <- LTKVs], - stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc). + stream_kv_node2(Bt, Reds, AssembleLTKVs, GTEKVs, InRange, Dir, Fun, Acc, Depth). -stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc) -> +stream_kv_node2(_Bt, _Reds, _PrevKVs, [], _InRange, _Dir, _Fun, Acc, _Depth) -> {ok, Acc}; -stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) -> +stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc, Depth) -> case InRange(K) of false -> {stop, {PrevKVs, Reds}, Acc}; @@ -1189,7 +1306,7 @@ stream_kv_node2(Bt, Reds, PrevKVs, [{K, V} | RestKVs], InRange, Dir, Fun, Acc) - case Fun(visit, AssembledKV, {PrevKVs, Reds}, Acc) of {ok, Acc2} -> stream_kv_node2( - Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2 + Bt, Reds, [AssembledKV | PrevKVs], RestKVs, InRange, Dir, Fun, Acc2, Depth ); {stop, Acc2} -> {stop, {PrevKVs, Reds}, Acc2} diff --git a/src/couch/src/couch_primary_sup.erl b/src/couch/src/couch_primary_sup.erl index 32ee282d6..93a1d2112 100644 --- a/src/couch/src/couch_primary_sup.erl +++ b/src/couch/src/couch_primary_sup.erl @@ -18,6 +18,7 @@ start_link() -> supervisor:start_link({local, couch_primary_services}, ?MODULE, []). init([]) -> + ok = couch_bt_engine_cache:create_tables(), Children = [ {couch_task_status, {couch_task_status, start_link, []}, permanent, brutal_kill, worker, @@ -45,7 +46,7 @@ init([]) -> ] ]}, permanent, 5000, worker, [ets_lru]} - ] ++ couch_servers(), + ] ++ couch_bt_engine_cache:sup_children() ++ couch_servers(), {ok, {{one_for_one, 10, 3600}, Children}}. couch_servers() -> diff --git a/src/couch/test/eunit/couch_bt_engine_cache_test.erl b/src/couch/test/eunit/couch_bt_engine_cache_test.erl new file mode 100644 index 000000000..0eade12a4 --- /dev/null +++ b/src/couch/test/eunit/couch_bt_engine_cache_test.erl @@ -0,0 +1,102 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_bt_engine_cache_test). + +-include_lib("couch/include/couch_eunit.hrl"). + +couch_bt_engine_cache_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + ?TDEF_FE(t_created), + ?TDEF_FE(t_insert_and_lookup), + ?TDEF_FE(t_decay_and_removal_works, 10), + ?TDEF_FE(t_pid_cleanup_works, 10) + ] + }. + +setup() -> + Ctx = test_util:start_applications([config]), + couch_bt_engine_cache:create_tables(), + #{shard_count := N} = couch_bt_engine_cache:info(), + Pids = lists:map( + fun(I) -> + {ok, Pid} = couch_bt_engine_cache:start_link(I), + Pid + end, + lists:seq(1, N) + ), + {Ctx, Pids}. + +teardown({Ctx, [_ | _] = Pids}) -> + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + exit(Pid, kill) + end, + Pids + ), + clear_tables(), + config:delete("bt_engine_cache", "max_size", false), + config:delete("bt_engine_cache", "leave_percent", false), + test_util:stop_applications(Ctx). + +clear_tables() -> + lists:foreach(fun ets:delete/1, couch_bt_engine_cache:tables()). + +t_created(_) -> + Info = couch_bt_engine_cache:info(), + #{size := Size, memory := Mem, shard_count := N} = Info, + ?assert(N >= 16, "shard count is greater or equal to 16"), + ?assertEqual(0, Size), + ?assert(is_integer(Mem)), + ?assert(Mem >= 0). + +t_insert_and_lookup(_) -> + ?assertError(function_clause, couch_bt_engine_cache:insert(not_a_pid, 1, foo)), + ?assertError(function_clause, couch_bt_engine_cache:insert(self(), xyz, foo)), + ?assertMatch(#{size := 0}, couch_bt_engine_cache:info()), + ?assert(couch_bt_engine_cache:insert({pid, 42}, term)), + ?assertMatch(#{size := 1}, couch_bt_engine_cache:info()), + ?assertNot(couch_bt_engine_cache:insert({pid, 42}, term)), + ?assertEqual(term, couch_bt_engine_cache:lookup({pid, 42})), + ?assertEqual(undefined, couch_bt_engine_cache:lookup({pid, 43})). + +t_decay_and_removal_works(_) -> + config:set("bt_engine_cache", "leave_percent", "0", false), + Term = [foo, bar, baz, lists:seq(1, 100)], + [couch_bt_engine_cache:insert({pid, I}, Term) || I <- lists:seq(1, 10000)], + WaitFun = fun() -> + #{size := Size} = couch_bt_engine_cache:info(), + case Size > 0 of + true -> wait; + false -> ok + end + end, + test_util:wait(WaitFun, 7500), + ?assertMatch(#{size := 0}, couch_bt_engine_cache:info()). + +t_pid_cleanup_works(_) -> + Pid = spawn(fun() -> timer:sleep(2000) end), + [couch_bt_engine_cache:insert({Pid, I}, baz) || I <- lists:seq(1, 1000)], + WaitFun = fun() -> + #{size := Size} = couch_bt_engine_cache:info(), + case Size > 0 of + true -> wait; + false -> ok + end + end, + test_util:wait(WaitFun, 7500), + ?assertMatch(#{size := 0}, couch_bt_engine_cache:info()). diff --git a/src/couch/test/eunit/couch_btree_tests.erl b/src/couch/test/eunit/couch_btree_tests.erl index 740196518..6a5f9a791 100644 --- a/src/couch/test/eunit/couch_btree_tests.erl +++ b/src/couch/test/eunit/couch_btree_tests.erl @@ -29,6 +29,15 @@ setup() -> setup_kvs(_) -> setup(). +setup_kvs_with_cache(_) -> + {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]), + {ok, Btree} = couch_btree:open(nil, Fd, [ + {compression, none}, + {reduce, fun reduce_fun/2}, + {cache_depth, 2} + ]), + {Fd, Btree}. + setup_red() -> {_, EvenOddKVs} = lists:foldl( fun(Idx, {Key, Acc}) -> @@ -47,6 +56,7 @@ setup_red(_) -> setup_red(). teardown(Fd) when is_pid(Fd) -> + config:delete("couchdb", "btree_chunk_size", false), ok = couch_file:close(Fd); teardown({Fd, _}) -> teardown(Fd). @@ -139,6 +149,24 @@ shuffled_kvs_test_() -> } }. +sorted_kvs_with_cache_test_() -> + Funs = kvs_test_funs(), + Sorted = [{Seq, rand:uniform()} || Seq <- lists:seq(1, ?ROWS)], + { + "BTree with a cache and sorted keys", + { + setup, + fun() -> test_util:start_couch() end, + fun test_util:stop/1, + { + foreachx, + fun setup_kvs_with_cache/1, + fun teardown/2, + [{Sorted, Fun} || Fun <- Funs] + } + } + }. + reductions_test_() -> { "BTree reductions", diff --git a/src/couch_prometheus/src/couch_prometheus.erl b/src/couch_prometheus/src/couch_prometheus.erl index 4d053d1af..eb61d7c1f 100644 --- a/src/couch_prometheus/src/couch_prometheus.erl +++ b/src/couch_prometheus/src/couch_prometheus.erl @@ -68,7 +68,8 @@ get_system_stats() -> get_internal_replication_jobs_stat(), get_membership_stat(), get_membership_nodes(), - get_distribution_stats() + get_distribution_stats(), + get_bt_engine_cache_stats() ]). get_uptime_stat() -> @@ -374,3 +375,12 @@ get_distribution_stats() -> get_ets_stats() -> NumTabs = length(ets:all()), to_prom(erlang_ets_table, gauge, "number of ETS tables", NumTabs). + +get_bt_engine_cache_stats() -> + Stats = couch_bt_engine_cache:info(), + Size = maps:get(size, Stats, 0), + Mem = maps:get(memory, Stats, 0), + [ + to_prom(couchdb_bt_engine_cache_memory, gauge, "memory used by the btree cache", Mem), + to_prom(couchdb_bt_engine_cache_size, gauge, "number of entries in the btree cache", Size) + ].