This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit a1214bbb519a576a795bbeac2144904c364b82bf Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Mon Jul 21 10:53:01 2025 -0400 Add time-seq to the db header and update it on commit Since time-seq is fixed size, well under 1KB when serialized, handle it like we handle epochs in the header. That is simpler than having a new btree, or having to juggle file term pointers. When we write the 4KB db header block most of it is empty anyway, so we'll use a few more hundreds bytes from there for time-seq data structure and as a result gain the ability to map update sequences to time intervals. This change is downgrade-safe because it's backwards compatible with previous supported disk format versions. It's possible to safely downgrade to a previous version before this feature was added. That is achieved by re-using a very old field from the header that was set to 0 for many years. Downgraded versions will simply ignore the new data structure. This means we don't to run compaction to upgrade anything, or create an extra intermediate release version in between to allow for safe downgrades. For simplicity, time-seq tracking is per-shard. During shard splitting or compaction the time-seq data structure is preserved. If the user moved the shard to another node, it will also be preserved. However, if shard files are manually truncated and rebuilt, then the updates in that shard file will appear at the later time. As such, the user then might get more (older) documents from that copy. In the context of time-based _changes feed implementation this would look like a rewind for that shard copy. However, we have those for regular changes feeds when shards are manipulated externally, and it's documented so it's in-line with the current such behavior. --- src/couch/src/couch_bt_engine.erl | 28 ++++++++++++++--- src/couch/src/couch_bt_engine_compactor.erl | 8 +++-- src/couch/src/couch_bt_engine_header.erl | 35 ++++++++++++++++++---- src/couch/src/couch_db.erl | 24 +++++++++++++++ src/couch/src/couch_db_engine.erl | 24 +++++++++++++++ src/couch/src/couch_db_int.hrl | 9 ++++-- src/couch/src/couch_db_split.erl | 4 ++- src/couch/src/couch_db_updater.erl | 27 ++++++++++++++--- .../eunit/couch_bt_engine_compactor_ev_tests.erl | 32 ++++++++++++++++---- 9 files changed, 166 insertions(+), 25 deletions(-) diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index 54a03978a..99c87f990 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -64,6 +64,9 @@ purge_docs/3, copy_purge_infos/2, + get_time_seq/1, + set_time_seq/2, + commit_data/1, open_write_stream/2, @@ -121,6 +124,7 @@ -define(REVS_LIMIT, revs_limit). -define(PURGE_INFOS_LIMIT, purge_infos_limit). -define(COMPACTED_SEQ, compacted_seq). +-define(TIME_SEQ_PTR, time_seq_ptr). -define(DEFAULT_BTREE_CACHE_DEPTH, 3). % Priority is about how long the entry will survive in the cache initially. A @@ -324,6 +328,11 @@ get_security(#st{} = St) -> get_props(#st{} = St) -> get_header_term(St, ?PROPS_PTR, []). +get_time_seq(#st{} = St) -> + TSeq = get_header_term(St, ?TIME_SEQ_PTR, couch_time_seq:new()), + % This may upgrade the data structure to a new version + couch_time_seq:new(TSeq). + get_update_seq(#st{header = Header}) -> couch_bt_engine_header:get(Header, ?UPDATE_SEQ). @@ -354,6 +363,15 @@ set_security(#st{} = St, NewSecurity) -> set_props(#st{} = St, Props) -> {ok, increment_update_seq(set_header_term(St, ?PROPS_PTR, Props))}. +set_time_seq(#st{} = St, TSeq) -> + % Expect this to not change very often to make sure to + % only update it if it's actual value changed + OldTSeq = get_time_seq(St), + case TSeq =:= OldTSeq of + true -> {ok, St}; + false -> {ok, set_header_term(St, ?TIME_SEQ_PTR, TSeq)} + end. + open_docs(#st{} = St, DocIds) -> Results = couch_btree:lookup(St#st.id_tree, DocIds), lists:map( @@ -1147,7 +1165,6 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> } = OldSt, #st{ filepath = CompactDataPath, - header = Header, local_tree = NewLocal1 } = NewSt1, @@ -1158,8 +1175,11 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []), {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs), - {ok, NewSt2} = commit_data(NewSt1#st{ - header = couch_bt_engine_header:set(Header, [ + % copy time_seq structure to get the most recent version of it + {ok, NewSt2} = set_time_seq(NewSt1, get_time_seq(OldSt)), + + {ok, NewSt3} = commit_data(NewSt2#st{ + header = couch_bt_engine_header:set(NewSt2#st.header, [ {?COMPACTED_SEQ, get_update_seq(OldSt)}, {?REVS_LIMIT, get_revs_limit(OldSt)}, {?PURGE_INFOS_LIMIT, get_purge_infos_limit(OldSt)} @@ -1188,7 +1208,7 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> % And return our finished new state {ok, - NewSt2#st{ + NewSt3#st{ filepath = FilePath }, undefined}. diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index f16ae4184..65aa76254 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -383,11 +383,15 @@ copy_compact(#comp_st{} = CompSt) -> Props = couch_bt_engine:get_props(St), {ok, NewSt5} = couch_bt_engine:copy_props(NewSt4, Props), + % Copy time-seq structure over + TSeq = couch_bt_engine:get_time_seq(St), + {ok, NewSt6} = couch_bt_engine:set_time_seq(NewSt5, TSeq), + FinalUpdateSeq = couch_bt_engine:get_update_seq(St), - {ok, NewSt6} = couch_bt_engine:set_update_seq(NewSt5, FinalUpdateSeq), + {ok, NewSt7} = couch_bt_engine:set_update_seq(NewSt6, FinalUpdateSeq), CompSt#comp_st{ - new_st = NewSt6 + new_st = NewSt7 }. copy_docs(St, #st{} = NewSt, MixedInfos, Retry) -> diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index dba0dba91..ec9a4322b 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -38,7 +38,8 @@ revs_limit/1, uuid/1, epochs/1, - compacted_seq/1 + compacted_seq/1, + time_seq_ptr/1 ]). -include_lib("stdlib/include/assert.hrl"). @@ -58,7 +59,7 @@ -record(db_header, { disk_version = ?LATEST_DISK_VERSION, update_seq = 0, - unused = 0, + time_seq_ptr = undefined, id_tree_state = nil, seq_tree_state = nil, local_tree_state = nil, @@ -105,7 +106,8 @@ upgrade(Header) -> fun upgrade_disk_version/1, fun upgrade_uuid/1, fun upgrade_epochs/1, - fun upgrade_compacted_seq/1 + fun upgrade_compacted_seq/1, + fun upgrade_time_seq/1 ], lists:foldl( fun(F, HdrAcc) -> @@ -176,6 +178,9 @@ epochs(Header) -> compacted_seq(Header) -> get_field(Header, compacted_seq). +time_seq_ptr(Header) -> + get_field(Header, time_seq_ptr). + purge_infos_limit(Header) -> get_field(Header, purge_infos_limit). @@ -329,6 +334,17 @@ upgrade_compacted_seq(#db_header{} = Header) -> Header end. +upgrade_time_seq(#db_header{} = Header) -> + case Header#db_header.time_seq_ptr of + 0 -> + % This used to be an unused, always set to 0 field before, + % on upgrade upgrade it to the default unset time_seq_ptr + % value: undefined + Header#db_header{time_seq_ptr = undefined}; + _ -> + Header + end. + latest(?LATEST_DISK_VERSION) -> true; latest(N) when is_integer(N), N < ?LATEST_DISK_VERSION -> @@ -337,6 +353,7 @@ latest(_Else) -> undefined. -ifdef(TEST). + -include_lib("couch/include/couch_eunit.hrl"). mk_header(Vsn) -> @@ -347,8 +364,8 @@ mk_header(Vsn) -> Vsn, % update_seq 100, - % unused - 0, + % time_seq_ptr + boom, % id_tree_state foo, % seq_tree_state @@ -477,6 +494,14 @@ get_epochs_from_old_header_test() -> Vsn5Header = mk_header(5), ?assertEqual(undefined, epochs(Vsn5Header)). +upgrade_time_seq_test() -> + Header = mk_header(8), + % time_seq's field was reused from an old field which was set to 0 so check + % that we can upgrade from 0 + HeaderWith0Unused = setelement(4, Header, 0), + Upgrade = upgrade(HeaderWith0Unused), + ?assertEqual(undefined, time_seq_ptr(Upgrade)). + tuple_uprade_test_() -> { foreach, diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 3100ecdc7..bca9b8e0a 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -56,6 +56,11 @@ get_oldest_purge_seq/1, get_purge_infos_limit/1, + time_seq_since/2, + time_seq_histogram/1, + get_time_seq/1, + set_time_seq/2, + is_db/1, is_system_db/1, is_clustered/1, @@ -568,6 +573,25 @@ get_oldest_purge_seq(#db{} = Db) -> get_purge_infos_limit(#db{} = Db) -> couch_db_engine:get_purge_infos_limit(Db). +time_seq_since(#db{time_seq = TSeq} = Db, Time) when is_integer(Time), Time >= 0 -> + case couch_time_seq:since(TSeq, Time) of + Seq when is_integer(Seq) -> Seq; + now -> couch_db:get_update_seq(Db) + end. + +time_seq_histogram(#db{time_seq = TSeq} = Db) -> + UpdateSeq = couch_db:get_update_seq(Db), + couch_time_seq:histogram(TSeq, UpdateSeq). + +get_time_seq(#db{time_seq = TSeq}) -> + TSeq. + +set_time_seq(#db{main_pid = Pid} = Db, #{} = TSeq) -> + check_is_admin(Db), + gen_server:call(Pid, {set_time_seq, TSeq}, infinity); +set_time_seq(_Db, _TSeq) -> + throw(invalid_time_seq). + get_pid(#db{main_pid = Pid}) -> Pid. diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 54f2c1482..d83e0ed9d 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -232,6 +232,11 @@ % Get the current properties. -callback get_props(DbHandle :: db_handle()) -> Props :: [any()]. +% Return the couch_time_seq structure. That is a small, fixed size, +% exponentially decaying set of time bins, mapping rough time intervals to db +% update sequences. Use couch_time_seq to access and update the data structure. +-callback get_time_seq(DbHandle :: db_handle()) -> TimeSeq :: any(). + % This information is displayed in the database info poperties. It % should just be a list of {Name::atom(), Size::non_neg_integer()} % tuples that will then be combined across shards. Currently, @@ -290,6 +295,14 @@ -callback set_props(DbHandle :: db_handle(), Props :: any()) -> {ok, NewDbHandle :: db_handle()}. +% This function is only called by couch_db_updater and couch_db_split, and so +% is guaranteed to be processed sequentially (or "single threaded" as mentioned +% in other API comments in this module). The database should simply store +% provided TimeSeq struct in file and reference it in the header somewhere. + +-callback set_time_seq(DbHandle :: db_handle(), TimeSeq :: any()) -> + {ok, NewDbHandle :: db_handle()}. + % Set the current update sequence of the database. The intention is to use this % when copying a database such that the destination update sequence should % match exactly the source update sequence. @@ -677,6 +690,7 @@ get_revs_limit/1, get_security/1, get_props/1, + get_time_seq/1, get_size_info/1, get_partition_info/2, get_update_seq/1, @@ -686,6 +700,7 @@ set_security/2, set_purge_infos_limit/2, set_props/2, + set_time_seq/2, set_update_seq/2, @@ -821,6 +836,10 @@ get_revs_limit(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_revs_limit(EngineState). +get_time_seq(#db{} = Db) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:get_time_seq(EngineState). + get_security(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, Engine:get_security(EngineState). @@ -865,6 +884,11 @@ set_props(#db{} = Db, Props) -> {ok, NewSt} = Engine:set_props(EngineState, Props), {ok, Db#db{engine = {Engine, NewSt}}}. +set_time_seq(#db{} = Db, TimeSeq) -> + #db{engine = {Engine, EngineState}} = Db, + {ok, NewSt} = Engine:set_time_seq(EngineState, TimeSeq), + {ok, Db#db{engine = {Engine, NewSt}}}. + set_update_seq(#db{} = Db, UpdateSeq) -> #db{engine = {Engine, EngineState}} = Db, {ok, NewSt} = Engine:set_update_seq(EngineState, UpdateSeq), diff --git a/src/couch/src/couch_db_int.hrl b/src/couch/src/couch_db_int.hrl index 7da0ce5df..1911c83f4 100644 --- a/src/couch/src/couch_db_int.hrl +++ b/src/couch/src/couch_db_int.hrl @@ -32,9 +32,12 @@ before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc after_doc_read = nil, % nil | fun(Doc, Db) -> NewDoc - % feature removed in 3.x, but field kept to avoid changing db record size - % and breaking rolling cluster upgrade - waiting_delayed_commit_deprecated, + % In 2.x versions this field was called waiting_delayed_commit. + % In 3.0->3.5 versions it was deprecated and named waiting_delayed_commit_deprecated. + % In 3.6+ it was repurposed to keep the time_seq structure. + % This repurposing and deprecating is done in order to avoid changing db + % record sizes and breaking cross-cluster online upgrades. + time_seq, options = [], compression diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl index bd325d980..3c01e576b 100644 --- a/src/couch/src/couch_db_split.erl +++ b/src/couch/src/couch_db_split.erl @@ -319,12 +319,14 @@ copy_meta(#state{source_db = SourceDb, targets = Targets} = State) -> RevsLimit = couch_db:get_revs_limit(SourceDb), {SecProps} = couch_db:get_security(SourceDb), PurgeLimit = couch_db:get_purge_infos_limit(SourceDb), + TimeSeq = couch_db:get_time_seq(SourceDb), Targets1 = maps:map( fun(_, #target{db = Db} = T) -> {ok, Db1} = couch_db_engine:set_revs_limit(Db, RevsLimit), {ok, Db2} = couch_db_engine:set_security(Db1, SecProps), {ok, Db3} = couch_db_engine:set_purge_infos_limit(Db2, PurgeLimit), - T#target{db = Db3} + {ok, Db4} = couch_db_engine:set_time_seq(Db3, TimeSeq), + T#target{db = Db4} end, Targets ), diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 3f6c8886d..909f1aeb5 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -92,6 +92,11 @@ handle_call({set_purge_infos_limit, Limit}, _From, Db) -> {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), ok = couch_server:db_updated(Db2), {reply, ok, Db2}; +handle_call({set_time_seq, TSeq}, _From, Db) -> + {ok, Db1} = couch_db_engine:set_time_seq(Db, TSeq), + {ok, Db2} = couch_db_engine:commit_data(Db1#db{time_seq = TSeq}), + ok = couch_server:db_updated(Db2), + {reply, ok, Db2}; handle_call({purge_docs, [], _}, _From, Db) -> {reply, {ok, []}, Db}; handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) -> @@ -313,6 +318,7 @@ init_db(DbName, FilePath, EngineState, Options) -> InitDb#db{ committed_update_seq = couch_db_engine:get_update_seq(InitDb), security = couch_db_engine:get_security(InitDb), + time_seq = couch_db_engine:get_time_seq(InitDb), options = lists:keystore(props, 1, NonCreateOpts, {props, DbProps}) }. @@ -876,11 +882,24 @@ apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) -> NewReplies = [{ok, RemovedRevs} | Replies], apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies). +update_time_seq(#db{time_seq = TSeq} = Db, Seq) when is_integer(Seq) -> + Timestamp = couch_time_seq:timestamp(), + TSeq1 = couch_time_seq:update(TSeq, Timestamp, Seq), + % Do not expect this structure to update very often, so only + % update the engine if its value changed + case TSeq =:= TSeq1 of + true -> + {ok, Db}; + false -> + Db1 = Db#db{time_seq = TSeq1}, + couch_db_engine:set_time_seq(Db1, TSeq1) + end. + commit_data(Db) -> - {ok, Db1} = couch_db_engine:commit_data(Db), - Db1#db{ - committed_update_seq = couch_db_engine:get_update_seq(Db) - }. + UpdateSeq = couch_db_engine:get_update_seq(Db), + {ok, Db1} = update_time_seq(Db, Db#db.committed_update_seq), + {ok, Db2} = couch_db_engine:commit_data(Db1), + Db2#db{committed_update_seq = UpdateSeq}. pair_write_info(Old, New) -> lists:map( diff --git a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl index 007c74d06..a728711d7 100644 --- a/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl +++ b/src/couch/test/eunit/couch_bt_engine_compactor_ev_tests.erl @@ -20,6 +20,8 @@ -define(EV_MOD, couch_bt_engine_compactor_ev). -define(INIT_DOCS, 2500). -define(WRITE_DOCS, 20). +-define(TSEQ_BOGUS_TIME, "3000-01-01T00:00:00Z"). +-define(TSEQ_BOGUS_SEQ, 4242). % The idea behind the tests in this module are to attempt to % cover the number of restart/recopy events during compaction @@ -123,12 +125,16 @@ teardown(Ctx) -> start_empty_db_test(_Event) -> ?EV_MOD:clear(), DbName = ?tempdb(), - {ok, _} = couch_db:create(DbName, [?ADMIN_CTX]), + {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]), + % Create a predictable time seq entry in the far future + % so we can test it's going to be carried along to the + % new compaction target and not get reset + ok = add_bogus_time_seq(Db), DbName. start_populated_db_test(Event) -> DbName = start_empty_db_test(Event), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), try populate_db(Db, ?INIT_DOCS) after @@ -228,7 +234,7 @@ run_static_init(Event, DbName) -> run_static(Event, DbName) -> {ok, ContinueFun} = ?EV_MOD:set_wait(init), {ok, Reason} = ?EV_MOD:set_crash(Event), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), Ref = couch_db:monitor(Db), {ok, CPid} = couch_db:start_compact(Db), ContinueFun(CPid), @@ -247,7 +253,7 @@ run_dynamic_init(Event, DbName) -> run_dynamic(Event, DbName) -> {ok, ContinueFun} = ?EV_MOD:set_wait(init), {ok, Reason} = ?EV_MOD:set_crash(Event), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), Ref = couch_db:monitor(Db), {ok, CPid} = couch_db:start_compact(Db), ok = populate_db(Db, 10), @@ -262,7 +268,7 @@ run_dynamic(Event, DbName) -> run_successful_compaction(DbName) -> ?EV_MOD:clear(), {ok, ContinueFun} = ?EV_MOD:set_wait(init), - {ok, Db} = couch_db:open_int(DbName, []), + {ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]), {ok, CPid} = couch_db:start_compact(Db), Ref = erlang:monitor(process, CPid), ContinueFun(CPid), @@ -296,6 +302,14 @@ wait_db_cleared(Db, N) -> end end. +add_bogus_time_seq(Db) -> + TSeq = couch_db:get_time_seq(Db), + % Set some bogus future time we'll check that compaction + % knows how to copy it properly and not reset it during compaction + Time = calendar:rfc3339_to_system_time(?TSEQ_BOGUS_TIME), + TSeq1 = couch_time_seq:update(TSeq, Time, ?TSEQ_BOGUS_SEQ), + ok = couch_db:set_time_seq(Db, TSeq1). + populate_db(_Db, NumDocs) when NumDocs =< 0 -> ok; populate_db(Db, NumDocs) -> @@ -324,7 +338,13 @@ validate_compaction(Db) -> end, {ok, {_, LastCount}} = couch_db:fold_docs(Db, FoldFun, {<<>>, 0}), ?assertEqual(DocCount + DelDocCount, LastCount), - ?assertEqual(NumChanges, LastCount). + ?assertEqual(NumChanges, LastCount), + % If there were any updates to the db check that the time seq structure + % was preserved + TSeq = couch_db:get_time_seq(Db), + #{bins := Bins} = TSeq, + Time = calendar:rfc3339_to_system_time(?TSEQ_BOGUS_TIME), + ?assertEqual({Time, ?TSEQ_BOGUS_SEQ}, hd(Bins)). purge_module() -> case code:which(couch_db_updater) of