This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 3a1619441e35ccc5a82ab3e57532c83ff450cc7a Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Mon Jul 21 11:56:38 2025 -0400 Add an HTTP API to query and reset time-seq values This is an escape hatch in case something went wrong with time synchronization. Users should always be able to reset the time seq structure and start from scratch. In fabric, the get* and set* calls are somewhat similar to how db metadata calls like get_revs_limit limit / set_revs_limit work, however to keep all the time-seq logic together added them to the single `fabric_time_seq` module. To inspect the time-seq structure use `GET $db/_time_seq`. In the result each shard time-seq data structure is returned. It's a mapping of formatted time in YYYY-MM-DDTHH:MM:SSZ format to count of sequence updates which occurred in that time interval for that shard. It may look something like: ```json { "00000000-7fffffff": { "node1@127.0.0.1": [["2025-07-21T16:00:00Z", 1]], "node2@127.0.0.1": [["2025-07-21T16:00:00Z", 1]], "node3@127.0.0.1": [["2025-07-21T16:00:00Z", 1]] }, "80000000-ffffffff": { "node1@127.0.0.1": [["2025-07-21T16:00:00Z", 3]], "node2@127.0.0.1": [["2025-07-21T16:00:00Z", 3]], "node3@127.0.0.1": [["2025-07-21T16:00:00Z", 3]] } } ``` For consistency here the result shape is modeled after the $db/_shards endpoint. The `DELETE $db/_time_seq` API endpoint will reset the data structure. After calling it, the result from `GET $db/_time_seq` will look like: ```json { "00000000-7fffffff": { "node1@127.0.0.1": [], "node2@127.0.0.1": [], "node3@127.0.0.1": [] }, "80000000-ffffffff": { "node1@127.0.0.1": [], "node2@127.0.0.1": [], "node3@127.0.0.1": [] } } ``` --- src/chttpd/src/chttpd_db.erl | 25 ++++ src/fabric/src/fabric.erl | 60 +++++++++ src/fabric/src/fabric_rpc.erl | 19 +++ src/fabric/src/fabric_time_seq.erl | 204 ++++++++++++++++++++++++++++++ src/mem3/test/eunit/mem3_reshard_test.erl | 29 ++++- 5 files changed, 336 insertions(+), 1 deletion(-) diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index a43baeae4..a321d5c85 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -870,6 +870,31 @@ db_req(#httpd{method = 'GET', path_parts = [_, <<"_purged_infos_limit">>]} = Req send_json(Req, fabric:get_purge_infos_limit(Db)); db_req(#httpd{path_parts = [_, <<"_purged_infos_limit">>]} = Req, _Db) -> send_method_not_allowed(Req, "GET,PUT"); +db_req(#httpd{method = 'GET', path_parts = [_, <<"_time_seq">>]} = Req, Db) -> + Options = [{user_ctx, Req#httpd.user_ctx}], + case fabric:get_time_seq(Db, Options) of + {ok, #{} = RangeNodeToTSeq} -> + Props = maps:fold( + fun([B, E], #{} = ByNode, Acc) -> + Range = mem3_util:range_to_hex([B, E]), + MapF = fun(_, TSeq) -> couch_time_seq:histogram(TSeq) end, + [{Range, maps:map(MapF, ByNode)} | Acc] + end, + [], + RangeNodeToTSeq + ), + send_json(Req, {lists:sort(Props)}); + Error -> + throw(Error) + end; +db_req(#httpd{method = 'DELETE', path_parts = [_, <<"_time_seq">>]} = Req, Db) -> + Options = [{user_ctx, Req#httpd.user_ctx}], + case fabric:set_time_seq(Db, couch_time_seq:new(), Options) of + ok -> send_json(Req, {[{<<"ok">>, true}]}); + Error -> throw(Error) + end; +db_req(#httpd{path_parts = [_, <<"_time_seq">>]} = Req, _Db) -> + send_method_not_allowed(Req, "GET,DELETE"); % Special case to enable using an unencoded slash in the URL of design docs, % as slashes in document IDs must otherwise be URL encoded. db_req( diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index d552a387d..0a4b4de25 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -69,6 +69,17 @@ db_uuids/1 ]). +% time_seq stuff +-export([ + time_seq_since/2, + time_seq_histogram/1, + time_seq_histogram/2, + get_time_seq/1, + get_time_seq/2, + set_time_seq/2, + set_time_seq/3 +]). + -type dbname() :: (iodata() | tuple()). -type docid() :: iodata(). -type revision() :: {integer(), binary()}. @@ -185,6 +196,39 @@ set_security(DbName, SecObj) -> set_security(DbName, SecObj, Options) -> fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). +%% @doc get the time seq histograms +-spec time_seq_histogram(dbname()) -> + {ok, #{#shard{} => [[binary() | non_neg_integer()]]}} | {error, any()} | no_return(). +time_seq_histogram(DbName) -> + time_seq_histogram(DbName, [?ADMIN_CTX]). + +%% @doc get the time seq histograms +-spec time_seq_histogram(dbname(), [option()]) -> + {ok, #{#shard{} => [[binary() | non_neg_integer()]]}} | {error, any()} | no_return(). +time_seq_histogram(DbName, Options) -> + fabric_time_seq:histogram(dbname(DbName), opts(Options)). + +%% @doc reset the time seq data structure +-spec set_time_seq(dbname(), couch_time_seq:time_seq()) -> ok | {error, any()}. +set_time_seq(DbName, TSeq) -> + set_time_seq(DbName, TSeq, [?ADMIN_CTX]). + +-spec set_time_seq(dbname(), couch_time_seq:time_seq(), [option()]) -> ok | {error, any()}. +set_time_seq(DbName, TSeq, Options) -> + fabric_time_seq:set_time_seq(dbname(DbName), TSeq, opts(Options)). + +%% @doc get the time seq data structure summary +-spec get_time_seq(dbname()) -> + {ok, #{#shard{} => couch_time_seq:time_seq()}} | {error, any()} | no_return(). +get_time_seq(DbName) -> + get_time_seq(DbName, [?ADMIN_CTX]). + +%% @doc get the time seq data structure summary +-spec get_time_seq(dbname(), [option()]) -> + {ok, #{#shard{} => couch_time_seq:time_seq()}} | {error, any()} | no_return(). +get_time_seq(DbName, Options) -> + fabric_time_seq:get_time_seq(dbname(DbName), opts(Options)). + %% @doc sets the upper bound for the number of stored purge requests -spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok. set_purge_infos_limit(DbName, Limit, Options) when @@ -637,6 +681,22 @@ dbname(Db) -> db_uuids(DbName) -> fabric_db_uuids:go(dbname(DbName)). +%% @doc get db update sequence before a timestamp +-spec time_seq_since(dbname(), list() | binary() | pos_integer()) -> + {ok, binary()} | {error, any()}. +time_seq_since(DbName, Time) when is_binary(Time) -> + time_seq_since(DbName, binary_to_list(Time)); +time_seq_since(DbName, Time) when is_list(Time) -> + try calendar:rfc3339_to_system_time(Time) of + TimeUnix -> + time_seq_since(DbName, TimeUnix) + catch + _:_ -> + {error, invalid_time_format} + end; +time_seq_since(DbName, Time) when is_integer(Time), Time >= 0 -> + fabric_time_seq:since(dbname(DbName), Time). + name(Thing) -> couch_util:to_binary(Thing). diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 67f529e09..631d1de65 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -54,6 +54,13 @@ get_uuid/1 ]). +-export([ + time_seq_since/2, + time_seq_histogram/2, + get_time_seq/2, + set_time_seq/3 +]). + -include_lib("fabric/include/fabric.hrl"). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). @@ -347,6 +354,18 @@ compact(ShardName, DesignName) -> get_uuid(DbName) -> with_db(DbName, [], {couch_db, get_uuid, []}). +time_seq_since(DbName, Time) -> + with_db(DbName, [], {couch_db, time_seq_since, [Time]}). + +time_seq_histogram(DbName, Options) -> + with_db(DbName, Options, {couch_db, time_seq_histogram, []}). + +get_time_seq(DbName, Options) -> + with_db(DbName, Options, {couch_db, get_time_seq, []}). + +set_time_seq(DbName, TSeq, Options) -> + with_db(DbName, Options, {couch_db, set_time_seq, [TSeq]}). + %% %% internal %% diff --git a/src/fabric/src/fabric_time_seq.erl b/src/fabric/src/fabric_time_seq.erl new file mode 100644 index 000000000..30f2f6594 --- /dev/null +++ b/src/fabric/src/fabric_time_seq.erl @@ -0,0 +1,204 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_time_seq). + +-export([ + since/2, + histogram/2, + set_time_seq/3, + get_time_seq/2 +]). + +-include_lib("mem3/include/mem3.hrl"). + +% Since sequence RPC call. + +% Return a clustered changes sequence which starts before the provided timestamp argument + +since(DbName, Time) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, time_seq_since, [Time]), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), []}, + try fabric_util:recv(Workers, #shard.ref, fun since_handle_message/3, Acc0) of + {timeout, {WorkersDict, _}} -> + DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), + fabric_util:log_timeout(DefunctWorkers, "time_seq_since"), + {error, timeout}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +since_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Counters, Resps}) -> + case fabric_ring:node_down(NodeRef, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, {nodedown, <<"progress not possible">>}} + end; +since_handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, Reason} + end; +since_handle_message(Sequence, Shard, {Counters, Resps}) when is_integer(Sequence) -> + case fabric_ring:handle_response(Shard, Sequence, Counters, Resps) of + {ok, {Counters1, Resps1}} -> + {ok, {Counters1, Resps1}}; + {stop, Resps1} -> + Seqs = fabric_dict:fold( + fun(S, Seq, Acc) -> + [{S#shard{ref = undefined}, Seq} | Acc] + end, + [], + Resps1 + ), + {stop, fabric_view_changes:pack_seqs(Seqs)} + end; +since_handle_message(Reason, Shard, {Counters, Resps}) -> + case fabric_ring:handle_error(Shard, Counters, Resps) of + {ok, Counters1} -> {ok, {Counters1, Resps}}; + error -> {error, Reason} + end. + +% Histogram time_seq RPC call + +% Return a per/range, per/node histogram nested map result +% #{Range => Node => Histogram} +% +histogram(DbName, Options) when is_binary(DbName) -> + Shards = mem3:live_shards(DbName, [config:node_name() | nodes()]), + Workers = fabric_util:submit_jobs(Shards, time_seq_histogram, [Options]), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), []}, + try fabric_util:recv(Workers, #shard.ref, fun histogram_handle_message/3, Acc0) of + {timeout, {WorkersDict, _}} -> + DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), + fabric_util:log_timeout(DefunctWorkers, "time_seq_histogram"), + {error, timeout}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +histogram_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Cntrs, Res}) -> + case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, {nodedown, <<"progress not possible">>}} + end; +histogram_handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, Reason} + end; +histogram_handle_message(Hist, Shard, {Cntrs, Res}) when is_list(Hist) -> + case fabric_ring:handle_response(Shard, Hist, Cntrs, Res, [all]) of + {ok, {Cntrs1, Res1}} -> + {ok, {Cntrs1, Res1}}; + {stop, Res1} -> + FoldF = fun(#shard{range = R, node = N}, S, Acc) -> [{R, N, S} | Acc] end, + AsList = fabric_dict:fold(FoldF, [], Res1), + ByRangeKeyF = fun({R, _, _}) -> R end, + ByRangeValF = fun({_, N, S}) -> {N, S} end, + % #{Range1 = [{Node1, Hist1}, {Node2, Hist2}], Range2 => [...], ...} + ByRange = maps:groups_from_list(ByRangeKeyF, ByRangeValF, AsList), + MapF = fun(_, ByNode) -> maps:from_list(ByNode) end, + Result = maps:map(MapF, ByRange), + {stop, Result} + end; +histogram_handle_message(Reason, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, Reason} + end. + +% Set time_seq RPC call + +set_time_seq(DbName, TSeq, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_time_seq, [TSeq, Options]), + Handler = fun set_time_seq_handle_message/3, + Acc0 = {Workers, length(Workers) - 1}, + case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of + {ok, ok} -> + ok; + {timeout, {DefunctWorkers, _}} -> + fabric_util:log_timeout(DefunctWorkers, "set_time_seq"), + {error, timeout}; + Error -> + Error + end. + +set_time_seq_handle_message(ok, _, {_Workers, 0}) -> + {stop, ok}; +set_time_seq_handle_message(ok, Worker, {Workers, Waiting}) -> + {ok, {lists:delete(Worker, Workers), Waiting - 1}}; +set_time_seq_handle_message(Error, _, _Acc) -> + {error, Error}. + +% Get time_seq RPC call + +% Return a per/range, per/node time_seq nested map result +% #{Range => Node => TSeq} +% +get_time_seq(DbName, Options) when is_binary(DbName) -> + Shards = mem3:live_shards(DbName, [config:node_name() | nodes()]), + Workers = fabric_util:submit_jobs(Shards, get_time_seq, [Options]), + RexiMon = fabric_util:create_monitors(Shards), + Acc0 = {fabric_dict:init(Workers, nil), []}, + try fabric_util:recv(Workers, #shard.ref, fun get_time_seq_handle_message/3, Acc0) of + {timeout, {WorkersDict, _}} -> + DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil), + fabric_util:log_timeout(DefunctWorkers, "get_time_seq"), + {error, timeout}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end. + +get_time_seq_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Cntrs, Res}) -> + case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, {nodedown, <<"progress not possible">>}} + end; +get_time_seq_handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, Reason} + end; +get_time_seq_handle_message(#{} = TSeq, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_response(Shard, TSeq, Cntrs, Res, [all]) of + {ok, {Cntrs1, Res1}} -> + {ok, {Cntrs1, Res1}}; + {stop, Res1} -> + FoldF = fun(#shard{range = R, node = N}, S, Acc) -> [{R, N, S} | Acc] end, + TSeqList = fabric_dict:fold(FoldF, [], Res1), + ByRangeKeyF = fun({R, _, _}) -> R end, + ByRangeValF = fun({_, N, S}) -> {N, S} end, + % #{Range1 = [{Node1, TSeq1}, {Node2, TSeq2}], Range2 => [...], ...} + TSeqsByRange = maps:groups_from_list(ByRangeKeyF, ByRangeValF, TSeqList), + Result = maps:map( + fun(_Range, NodeTimeSeqs) -> + maps:from_list(NodeTimeSeqs) + end, + TSeqsByRange + ), + {stop, Result} + end; +get_time_seq_handle_message(Reason, Shard, {Cntrs, Res}) -> + case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of + {ok, Cntrs1} -> {ok, {Cntrs1, Res}}; + error -> {error, Reason} + end. diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl index 2579649f9..669ac1f58 100644 --- a/src/mem3/test/eunit/mem3_reshard_test.erl +++ b/src/mem3/test/eunit/mem3_reshard_test.erl @@ -103,6 +103,14 @@ split_one_shard(#{db1 := Db}) -> SecObj = {[{<<"foo">>, <<"bar">>}]}, set_security(Db, SecObj), + {ok, TSeqs} = get_time_seq(Db), + Node = config:node_name(), + #{ + [16#00000000, 16#ffffffff] := #{ + Node := #{bins := [{Time1, TSeq1} | _]} + } + } = TSeqs, + % DbInfo is saved after setting metadata bits % as those could bump the update sequence DbInfo0 = get_db_info(Db), @@ -153,7 +161,23 @@ split_one_shard(#{db1 := Db}) -> % Don't forget about the local but don't include internal checkpoints % as some of those are munged and transformed during the split - ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) + ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)), + + % Verify the time seq structure. There should be one per range. With the + % same time and sequence as the source + {ok, TSeqs1} = get_time_seq(Db), + #{ + [16#00000000, 16#7fffffff] := #{ + Node := #{bins := [{Time2, TSeq2} | _]} + }, + [16#80000000, 16#ffffffff] := #{ + Node := #{bins := [{Time3, TSeq3} | _]} + } + } = TSeqs1, + ?assertEqual(TSeq1, TSeq2), + ?assertEqual(TSeq1, TSeq3), + ?assertEqual(Time1, Time2), + ?assertEqual(Time1, Time3) end)}. % Test to check that shard with high number of purges can be split @@ -781,6 +805,9 @@ set_security(DbName, SecObj) -> get_security(DbName) -> with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end). +get_time_seq(DbName) -> + with_proc(fun() -> fabric:get_time_seq(DbName, [?ADMIN_CTX]) end). + get_db_info(DbName) -> with_proc(fun() -> {ok, Info} = fabric:get_db_info(DbName),