This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3a1619441e35ccc5a82ab3e57532c83ff450cc7a
Author: Nick Vatamaniuc <vatam...@gmail.com>
AuthorDate: Mon Jul 21 11:56:38 2025 -0400

    Add an HTTP API to query and reset time-seq values
    
    This is an escape hatch in case something went wrong with time 
synchronization.
    Users should always be able to reset the time seq structure and start from
    scratch.
    
    In fabric, the get* and set* calls are somewhat similar to how db metadata
    calls like get_revs_limit limit / set_revs_limit work, however to keep all 
the
    time-seq logic together added them to the single `fabric_time_seq` module.
    
    To inspect the time-seq structure use `GET $db/_time_seq`. In the result 
each
    shard time-seq data structure is returned. It's a mapping of formatted time 
in
    YYYY-MM-DDTHH:MM:SSZ format to count of sequence updates which occurred in 
that
    time interval for that shard. It may look something like:
    
    ```json
    {
        "00000000-7fffffff": {
            "node1@127.0.0.1": [["2025-07-21T16:00:00Z", 1]],
            "node2@127.0.0.1": [["2025-07-21T16:00:00Z", 1]],
            "node3@127.0.0.1": [["2025-07-21T16:00:00Z", 1]]
        },
        "80000000-ffffffff": {
            "node1@127.0.0.1": [["2025-07-21T16:00:00Z", 3]],
            "node2@127.0.0.1": [["2025-07-21T16:00:00Z", 3]],
            "node3@127.0.0.1": [["2025-07-21T16:00:00Z", 3]]
        }
    }
    ```
    
    For consistency here the result shape is modeled after the $db/_shards
    endpoint.
    
    The `DELETE $db/_time_seq` API endpoint will reset the data structure. After
    calling it, the result from `GET $db/_time_seq` will look like:
    
    ```json
    {
        "00000000-7fffffff": {
            "node1@127.0.0.1": [],
            "node2@127.0.0.1": [],
            "node3@127.0.0.1": []
        },
        "80000000-ffffffff": {
            "node1@127.0.0.1": [],
            "node2@127.0.0.1": [],
            "node3@127.0.0.1": []
        }
    }
    ```
---
 src/chttpd/src/chttpd_db.erl              |  25 ++++
 src/fabric/src/fabric.erl                 |  60 +++++++++
 src/fabric/src/fabric_rpc.erl             |  19 +++
 src/fabric/src/fabric_time_seq.erl        | 204 ++++++++++++++++++++++++++++++
 src/mem3/test/eunit/mem3_reshard_test.erl |  29 ++++-
 5 files changed, 336 insertions(+), 1 deletion(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index a43baeae4..a321d5c85 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -870,6 +870,31 @@ db_req(#httpd{method = 'GET', path_parts = [_, 
<<"_purged_infos_limit">>]} = Req
     send_json(Req, fabric:get_purge_infos_limit(Db));
 db_req(#httpd{path_parts = [_, <<"_purged_infos_limit">>]} = Req, _Db) ->
     send_method_not_allowed(Req, "GET,PUT");
+db_req(#httpd{method = 'GET', path_parts = [_, <<"_time_seq">>]} = Req, Db) ->
+    Options = [{user_ctx, Req#httpd.user_ctx}],
+    case fabric:get_time_seq(Db, Options) of
+        {ok, #{} = RangeNodeToTSeq} ->
+            Props = maps:fold(
+                fun([B, E], #{} = ByNode, Acc) ->
+                    Range = mem3_util:range_to_hex([B, E]),
+                    MapF = fun(_, TSeq) -> couch_time_seq:histogram(TSeq) end,
+                    [{Range, maps:map(MapF, ByNode)} | Acc]
+                end,
+                [],
+                RangeNodeToTSeq
+            ),
+            send_json(Req, {lists:sort(Props)});
+        Error ->
+            throw(Error)
+    end;
+db_req(#httpd{method = 'DELETE', path_parts = [_, <<"_time_seq">>]} = Req, Db) 
->
+    Options = [{user_ctx, Req#httpd.user_ctx}],
+    case fabric:set_time_seq(Db, couch_time_seq:new(), Options) of
+        ok -> send_json(Req, {[{<<"ok">>, true}]});
+        Error -> throw(Error)
+    end;
+db_req(#httpd{path_parts = [_, <<"_time_seq">>]} = Req, _Db) ->
+    send_method_not_allowed(Req, "GET,DELETE");
 % Special case to enable using an unencoded slash in the URL of design docs,
 % as slashes in document IDs must otherwise be URL encoded.
 db_req(
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index d552a387d..0a4b4de25 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -69,6 +69,17 @@
     db_uuids/1
 ]).
 
+% time_seq stuff
+-export([
+    time_seq_since/2,
+    time_seq_histogram/1,
+    time_seq_histogram/2,
+    get_time_seq/1,
+    get_time_seq/2,
+    set_time_seq/2,
+    set_time_seq/3
+]).
+
 -type dbname() :: (iodata() | tuple()).
 -type docid() :: iodata().
 -type revision() :: {integer(), binary()}.
@@ -185,6 +196,39 @@ set_security(DbName, SecObj) ->
 set_security(DbName, SecObj, Options) ->
     fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)).
 
+%% @doc get the time seq histograms
+-spec time_seq_histogram(dbname()) ->
+    {ok, #{#shard{} => [[binary() | non_neg_integer()]]}} | {error, any()} | 
no_return().
+time_seq_histogram(DbName) ->
+    time_seq_histogram(DbName, [?ADMIN_CTX]).
+
+%% @doc get the time seq histograms
+-spec time_seq_histogram(dbname(), [option()]) ->
+    {ok, #{#shard{} => [[binary() | non_neg_integer()]]}} | {error, any()} | 
no_return().
+time_seq_histogram(DbName, Options) ->
+    fabric_time_seq:histogram(dbname(DbName), opts(Options)).
+
+%% @doc reset the time seq data structure
+-spec set_time_seq(dbname(), couch_time_seq:time_seq()) -> ok | {error, any()}.
+set_time_seq(DbName, TSeq) ->
+    set_time_seq(DbName, TSeq, [?ADMIN_CTX]).
+
+-spec set_time_seq(dbname(), couch_time_seq:time_seq(), [option()]) -> ok | 
{error, any()}.
+set_time_seq(DbName, TSeq, Options) ->
+    fabric_time_seq:set_time_seq(dbname(DbName), TSeq, opts(Options)).
+
+%% @doc get the time seq data structure summary
+-spec get_time_seq(dbname()) ->
+    {ok, #{#shard{} => couch_time_seq:time_seq()}} | {error, any()} | 
no_return().
+get_time_seq(DbName) ->
+    get_time_seq(DbName, [?ADMIN_CTX]).
+
+%% @doc get the time seq data structure summary
+-spec get_time_seq(dbname(), [option()]) ->
+    {ok, #{#shard{} => couch_time_seq:time_seq()}} | {error, any()} | 
no_return().
+get_time_seq(DbName, Options) ->
+    fabric_time_seq:get_time_seq(dbname(DbName), opts(Options)).
+
 %% @doc sets the upper bound for the number of stored purge requests
 -spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok.
 set_purge_infos_limit(DbName, Limit, Options) when
@@ -637,6 +681,22 @@ dbname(Db) ->
 db_uuids(DbName) ->
     fabric_db_uuids:go(dbname(DbName)).
 
+%% @doc get db update sequence before a timestamp
+-spec time_seq_since(dbname(), list() | binary() | pos_integer()) ->
+    {ok, binary()} | {error, any()}.
+time_seq_since(DbName, Time) when is_binary(Time) ->
+    time_seq_since(DbName, binary_to_list(Time));
+time_seq_since(DbName, Time) when is_list(Time) ->
+    try calendar:rfc3339_to_system_time(Time) of
+        TimeUnix ->
+            time_seq_since(DbName, TimeUnix)
+    catch
+        _:_ ->
+            {error, invalid_time_format}
+    end;
+time_seq_since(DbName, Time) when is_integer(Time), Time >= 0 ->
+    fabric_time_seq:since(dbname(DbName), Time).
+
 name(Thing) ->
     couch_util:to_binary(Thing).
 
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 67f529e09..631d1de65 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -54,6 +54,13 @@
     get_uuid/1
 ]).
 
+-export([
+    time_seq_since/2,
+    time_seq_histogram/2,
+    get_time_seq/2,
+    set_time_seq/3
+]).
+
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -347,6 +354,18 @@ compact(ShardName, DesignName) ->
 get_uuid(DbName) ->
     with_db(DbName, [], {couch_db, get_uuid, []}).
 
+time_seq_since(DbName, Time) ->
+    with_db(DbName, [], {couch_db, time_seq_since, [Time]}).
+
+time_seq_histogram(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, time_seq_histogram, []}).
+
+get_time_seq(DbName, Options) ->
+    with_db(DbName, Options, {couch_db, get_time_seq, []}).
+
+set_time_seq(DbName, TSeq, Options) ->
+    with_db(DbName, Options, {couch_db, set_time_seq, [TSeq]}).
+
 %%
 %% internal
 %%
diff --git a/src/fabric/src/fabric_time_seq.erl 
b/src/fabric/src/fabric_time_seq.erl
new file mode 100644
index 000000000..30f2f6594
--- /dev/null
+++ b/src/fabric/src/fabric_time_seq.erl
@@ -0,0 +1,204 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_time_seq).
+
+-export([
+    since/2,
+    histogram/2,
+    set_time_seq/3,
+    get_time_seq/2
+]).
+
+-include_lib("mem3/include/mem3.hrl").
+
+% Since sequence RPC call.
+
+% Return a clustered changes sequence which starts before the provided 
timestamp argument
+
+since(DbName, Time) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, time_seq_since, [Time]),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun since_handle_message/3, 
Acc0) of
+        {timeout, {WorkersDict, _}} ->
+            DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+            fabric_util:log_timeout(DefunctWorkers, "time_seq_since"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+since_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Counters, 
Resps}) ->
+    case fabric_ring:node_down(NodeRef, Counters, Resps) of
+        {ok, Counters1} -> {ok, {Counters1, Resps}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+since_handle_message({rexi_EXIT, Reason}, Shard, {Counters, Resps}) ->
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
+        {ok, Counters1} -> {ok, {Counters1, Resps}};
+        error -> {error, Reason}
+    end;
+since_handle_message(Sequence, Shard, {Counters, Resps}) when 
is_integer(Sequence) ->
+    case fabric_ring:handle_response(Shard, Sequence, Counters, Resps) of
+        {ok, {Counters1, Resps1}} ->
+            {ok, {Counters1, Resps1}};
+        {stop, Resps1} ->
+            Seqs = fabric_dict:fold(
+                fun(S, Seq, Acc) ->
+                    [{S#shard{ref = undefined}, Seq} | Acc]
+                end,
+                [],
+                Resps1
+            ),
+            {stop, fabric_view_changes:pack_seqs(Seqs)}
+    end;
+since_handle_message(Reason, Shard, {Counters, Resps}) ->
+    case fabric_ring:handle_error(Shard, Counters, Resps) of
+        {ok, Counters1} -> {ok, {Counters1, Resps}};
+        error -> {error, Reason}
+    end.
+
+% Histogram time_seq RPC call
+
+% Return a per/range, per/node histogram nested map result
+%    #{Range => Node => Histogram}
+%
+histogram(DbName, Options) when is_binary(DbName) ->
+    Shards = mem3:live_shards(DbName, [config:node_name() | nodes()]),
+    Workers = fabric_util:submit_jobs(Shards, time_seq_histogram, [Options]),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun histogram_handle_message/3, 
Acc0) of
+        {timeout, {WorkersDict, _}} ->
+            DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+            fabric_util:log_timeout(DefunctWorkers, "time_seq_histogram"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+histogram_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Cntrs, 
Res}) ->
+    case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+histogram_handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end;
+histogram_handle_message(Hist, Shard, {Cntrs, Res}) when is_list(Hist) ->
+    case fabric_ring:handle_response(Shard, Hist, Cntrs, Res, [all]) of
+        {ok, {Cntrs1, Res1}} ->
+            {ok, {Cntrs1, Res1}};
+        {stop, Res1} ->
+            FoldF = fun(#shard{range = R, node = N}, S, Acc) -> [{R, N, S} | 
Acc] end,
+            AsList = fabric_dict:fold(FoldF, [], Res1),
+            ByRangeKeyF = fun({R, _, _}) -> R end,
+            ByRangeValF = fun({_, N, S}) -> {N, S} end,
+            % #{Range1 = [{Node1, Hist1}, {Node2, Hist2}], Range2 => [...], 
...}
+            ByRange = maps:groups_from_list(ByRangeKeyF, ByRangeValF, AsList),
+            MapF = fun(_, ByNode) -> maps:from_list(ByNode) end,
+            Result = maps:map(MapF, ByRange),
+            {stop, Result}
+    end;
+histogram_handle_message(Reason, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end.
+
+% Set time_seq RPC call
+
+set_time_seq(DbName, TSeq, Options) ->
+    Shards = mem3:shards(DbName),
+    Workers = fabric_util:submit_jobs(Shards, set_time_seq, [TSeq, Options]),
+    Handler = fun set_time_seq_handle_message/3,
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
+        {ok, ok} ->
+            ok;
+        {timeout, {DefunctWorkers, _}} ->
+            fabric_util:log_timeout(DefunctWorkers, "set_time_seq"),
+            {error, timeout};
+        Error ->
+            Error
+    end.
+
+set_time_seq_handle_message(ok, _, {_Workers, 0}) ->
+    {stop, ok};
+set_time_seq_handle_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+set_time_seq_handle_message(Error, _, _Acc) ->
+    {error, Error}.
+
+% Get time_seq RPC call
+
+% Return a per/range, per/node time_seq nested map result
+%    #{Range => Node => TSeq}
+%
+get_time_seq(DbName, Options) when is_binary(DbName) ->
+    Shards = mem3:live_shards(DbName, [config:node_name() | nodes()]),
+    Workers = fabric_util:submit_jobs(Shards, get_time_seq, [Options]),
+    RexiMon = fabric_util:create_monitors(Shards),
+    Acc0 = {fabric_dict:init(Workers, nil), []},
+    try fabric_util:recv(Workers, #shard.ref, fun 
get_time_seq_handle_message/3, Acc0) of
+        {timeout, {WorkersDict, _}} ->
+            DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+            fabric_util:log_timeout(DefunctWorkers, "get_time_seq"),
+            {error, timeout};
+        Else ->
+            Else
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+get_time_seq_handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, {Cntrs, 
Res}) ->
+    case fabric_ring:node_down(NodeRef, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, {nodedown, <<"progress not possible">>}}
+    end;
+get_time_seq_handle_message({rexi_EXIT, Reason}, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end;
+get_time_seq_handle_message(#{} = TSeq, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_response(Shard, TSeq, Cntrs, Res, [all]) of
+        {ok, {Cntrs1, Res1}} ->
+            {ok, {Cntrs1, Res1}};
+        {stop, Res1} ->
+            FoldF = fun(#shard{range = R, node = N}, S, Acc) -> [{R, N, S} | 
Acc] end,
+            TSeqList = fabric_dict:fold(FoldF, [], Res1),
+            ByRangeKeyF = fun({R, _, _}) -> R end,
+            ByRangeValF = fun({_, N, S}) -> {N, S} end,
+            % #{Range1 = [{Node1, TSeq1}, {Node2, TSeq2}], Range2 => [...], 
...}
+            TSeqsByRange = maps:groups_from_list(ByRangeKeyF, ByRangeValF, 
TSeqList),
+            Result = maps:map(
+                fun(_Range, NodeTimeSeqs) ->
+                    maps:from_list(NodeTimeSeqs)
+                end,
+                TSeqsByRange
+            ),
+            {stop, Result}
+    end;
+get_time_seq_handle_message(Reason, Shard, {Cntrs, Res}) ->
+    case fabric_ring:handle_error(Shard, Cntrs, Res, [all]) of
+        {ok, Cntrs1} -> {ok, {Cntrs1, Res}};
+        error -> {error, Reason}
+    end.
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl 
b/src/mem3/test/eunit/mem3_reshard_test.erl
index 2579649f9..669ac1f58 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -103,6 +103,14 @@ split_one_shard(#{db1 := Db}) ->
             SecObj = {[{<<"foo">>, <<"bar">>}]},
             set_security(Db, SecObj),
 
+            {ok, TSeqs} = get_time_seq(Db),
+            Node = config:node_name(),
+            #{
+                [16#00000000, 16#ffffffff] := #{
+                    Node := #{bins := [{Time1, TSeq1} | _]}
+                }
+            } = TSeqs,
+
             % DbInfo is saved after setting metadata bits
             % as those could bump the update sequence
             DbInfo0 = get_db_info(Db),
@@ -153,7 +161,23 @@ split_one_shard(#{db1 := Db}) ->
 
             % Don't forget about the local but don't include internal 
checkpoints
             % as some of those are munged and transformed during the split
-            ?assertEqual(without_meta_locals(Local0), 
without_meta_locals(Local1))
+            ?assertEqual(without_meta_locals(Local0), 
without_meta_locals(Local1)),
+
+            % Verify the time seq structure. There should be one per range. 
With the
+            % same time and sequence as the source
+            {ok, TSeqs1} = get_time_seq(Db),
+            #{
+                [16#00000000, 16#7fffffff] := #{
+                    Node := #{bins := [{Time2, TSeq2} | _]}
+                },
+                [16#80000000, 16#ffffffff] := #{
+                    Node := #{bins := [{Time3, TSeq3} | _]}
+                }
+            } = TSeqs1,
+            ?assertEqual(TSeq1, TSeq2),
+            ?assertEqual(TSeq1, TSeq3),
+            ?assertEqual(Time1, Time2),
+            ?assertEqual(Time1, Time3)
         end)}.
 
 % Test to check that shard with high number of purges can be split
@@ -781,6 +805,9 @@ set_security(DbName, SecObj) ->
 get_security(DbName) ->
     with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end).
 
+get_time_seq(DbName) ->
+    with_proc(fun() -> fabric:get_time_seq(DbName, [?ADMIN_CTX]) end).
+
 get_db_info(DbName) ->
     with_proc(fun() ->
         {ok, Info} = fabric:get_db_info(DbName),

Reply via email to