This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 1648781261153f686bca4ac7339c7cab5a7823a7
Author: Nick Vatamaniuc <vatam...@gmail.com>
AuthorDate: Mon Jul 21 12:31:02 2025 -0400

    Time-based since parameter for _changes
    
    Use the new time-seq feature to stream changes from before a point in time.
    
    This can be used for backups or any case when then it helps to associate a
    range of sequence updates to a time interval. The time-seq exponential 
decaying
    interval rules apply: the further back in time, the less accurate the time
    intervals will be.
    
    The API change consists in making `since` accept a standard time value and
    streaming the changes started right before that time value based on the 
known
    time-seq intervals. The time format of the since parameter is
    YYYY-MM-DDTHH:MM:SSZ. It's valid as either an ISO 8601 or an RFC 3339 
format.
    
    From API design point of view this feature can be regarded as an extension 
to
    the other `since` values like `now` or `0`.
    
    Implementation-wise the change is treated similarly how we treat the `now`
    special value: before the changes request starts, we translate the time 
value
    to a proper `since` sequence. After that, we continue on with that regular
    sequence as if nothing special happened. Consequently, the shape of the 
emitted
    result is exactly the same as any previous change sequences. This is an 
extra
    "plus" for consistency and compatibility.
    
    To get a feel for the feature, I created a small db and updated it every few
    hours during the day:
    
    `http get $DB/db/_time_seq`
    
    ```
    {
        "00000000-ffffffff": {
            "node1@127.0.0.1": [
                ["2025-07-21T01:00:00Z", 15],
                ["2025-07-21T05:00:00Z", 2]
                ["2025-07-21T19:00:00Z", 9],
                ["2025-07-21T20:00:00Z", 5],
                ["2025-07-21T21:00:00Z", 70]
                ["2025-07-21T22:00:00Z", 10]
            ]
        }
    }
    ```
    
    Change feed with `since=2025-07-21T22:00:00Z` will return documents changed
    since that last hour only:
    
    ```
    % http get $DB/db/_changes'?since=2025-07-21T22:00:00Z' | jq -r 
'.results[].id'
    
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    ```
    
    Even the somewhat obscure `since_seq` replication parameter should work, so 
we
    can replicate from a particular point in time:
    
    ```
    % http post 'http://adm:pass@localhost:15984/_replicate' \
      source:='"http://adm:pass@localhost:15984/db";' \
      target:='"http://adm:pass@localhost:15984/tgt";' \
      since_seq:='"2025-07-21T22:00:00Z"'
    
    {
        "history": [
            {
                "bulk_get_attempts": 10,
                "bulk_get_docs": 10,
                "doc_write_failures": 0,
                "docs_read": 10,
                "docs_written": 10,
                "end_last_seq": 
"111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews",
                "end_time": "Mon, 21 Jul 2025 22:11:59 GMT",
                "missing_checked": 10,
                "missing_found": 10,
                "recorded_seq": 
"111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews",
                "session_id": "19252b97e34088aeaaa6cde6694a419f",
                "start_last_seq": "2025-07-21T22:00:00Z",
                "start_time": "Mon, 21 Jul 2025 22:11:55 GMT"
            }
        ],
        "ok": true,
        "replication_id_version": 4,
        "session_id": "19252b97e34088aeaaa6cde6694a419f",
        "source_last_seq": 
"111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews"
    }
    ```
    
    The target db now has only documents written in that last hour:
    
    ```
    % http $DB/tgt/_all_docs | jq -r '.rows[].id'
    
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    ```
---
 src/chttpd/src/chttpd_db.erl                  | 11 ++-
 src/chttpd/test/eunit/chttpd_changes_test.erl | 97 ++++++++++++++++++++++++++-
 src/fabric/src/fabric_view_changes.erl        | 21 ++++--
 3 files changed, 115 insertions(+), 14 deletions(-)

diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index a321d5c85..4bab083d4 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -872,18 +872,17 @@ db_req(#httpd{path_parts = [_, 
<<"_purged_infos_limit">>]} = Req, _Db) ->
     send_method_not_allowed(Req, "GET,PUT");
 db_req(#httpd{method = 'GET', path_parts = [_, <<"_time_seq">>]} = Req, Db) ->
     Options = [{user_ctx, Req#httpd.user_ctx}],
-    case fabric:get_time_seq(Db, Options) of
-        {ok, #{} = RangeNodeToTSeq} ->
+    case fabric:time_seq_histogram(Db, Options) of
+        {ok, #{} = RangeNodeToHist} ->
             Props = maps:fold(
                 fun([B, E], #{} = ByNode, Acc) ->
                     Range = mem3_util:range_to_hex([B, E]),
-                    MapF = fun(_, TSeq) -> couch_time_seq:histogram(TSeq) end,
-                    [{Range, maps:map(MapF, ByNode)} | Acc]
+                    [{Range, ByNode} | Acc]
                 end,
                 [],
-                RangeNodeToTSeq
+                RangeNodeToHist
             ),
-            send_json(Req, {lists:sort(Props)});
+            send_json(Req, {[{<<"time_seq">>, {lists:sort(Props)}}]});
         Error ->
             throw(Error)
     end;
diff --git a/src/chttpd/test/eunit/chttpd_changes_test.erl 
b/src/chttpd/test/eunit/chttpd_changes_test.erl
index b93667752..a89842ae3 100644
--- a/src/chttpd/test/eunit/chttpd_changes_test.erl
+++ b/src/chttpd/test/eunit/chttpd_changes_test.erl
@@ -83,7 +83,8 @@ changes_test_() ->
             ?TDEF(t_selector_filter),
             ?TDEF(t_design_filter),
             ?TDEF(t_docs_id_filter),
-            ?TDEF(t_docs_id_filter_over_limit)
+            ?TDEF(t_docs_id_filter_over_limit),
+            ?TDEF(t_time_since)
         ])
     }.
 
@@ -109,7 +110,8 @@ changes_q8_test_() ->
             ?TDEF(t_reverse_limit_one_q8),
             ?TDEF(t_selector_filter),
             ?TDEF(t_design_filter),
-            ?TDEF(t_docs_id_filter_q8)
+            ?TDEF(t_docs_id_filter_q8),
+            ?TDEF(t_time_since_q8)
         ])
     }.
 
@@ -493,6 +495,97 @@ t_docs_id_filter_over_limit({_, DbUrl}) ->
         Rows
     ).
 
+t_time_since({_, DbUrl}) ->
+    Params1 = "?since=3000-02-03T04:05:00Z",
+    {Seq1, Pending1, Rows1} = changes(DbUrl, Params1),
+    ?assertEqual(8, Seq1),
+    ?assertEqual(0, Pending1),
+    ?assertEqual([], Rows1, "Far into the future, we should get nothing"),
+
+    Params2 = "?since=2025-07-01T04:05:00Z",
+    Res2 = {Seq2, Pending2, Rows2} = changes(DbUrl, Params2),
+    ?assertEqual(8, Seq2),
+    ?assertEqual(0, Pending2),
+    ?assertEqual(
+        [
+            {6, {?DOC1, <<"2-c">>}, ?LEAFREV},
+            {7, {?DOC3, <<"2-b">>}, ?DELETED},
+            {8, {?DDOC2, <<"2-c">>}, ?LEAFREV}
+        ],
+        Rows2,
+        "Before the feature is released, should same as since=0"
+    ),
+
+    ?assertEqual(
+        Res2,
+        changes(DbUrl, "?since=0"),
+        "Expected the same result as from '?since=0'"
+    ),
+
+    Params3 = "?since=2025-01-01Txx:yy:00Z",
+    {InvalCode, InvalRes} = reqraw(get, DbUrl ++ "/_changes" ++ Params3),
+    ?assertEqual(400, InvalCode),
+    ?assertMatch(
+        #{
+            <<"error">> := <<"bad_request">>,
+            <<"reason">> := <<"invalid_time_format">>
+        },
+        json(InvalRes),
+        "Invalid time"
+    ),
+
+    {TSeqCode, TSeqRes} = reqraw(get, DbUrl ++ "/_time_seq"),
+    ?assertEqual(200, TSeqCode),
+    Year = integer_to_binary(element(1, date())),
+    Node = atom_to_binary(config:node_name()),
+    ?assertMatch(
+        #{
+            <<"time_seq">> := #{
+                <<"00000000-ffffffff">> := #{
+                    Node := [[<<Year:4/binary, _/binary>>, 8]]
+                }
+            }
+        },
+        json(TSeqRes),
+        "Check we can get _time_seq via http API"
+    ),
+
+    {TSeqResetCode, TSeqResetRes} = reqraw(delete, DbUrl ++ "/_time_seq"),
+    ?assertEqual(200, TSeqResetCode),
+    ?assertMatch(#{<<"ok">> := true}, json(TSeqResetRes), "Reset the time seq 
info"),
+
+    {TSeqCodeVerify, TSeqResVerify} = reqraw(get, DbUrl ++ "/_time_seq"),
+    ?assertEqual(200, TSeqCodeVerify),
+    ?assertMatch(
+        #{<<"time_seq">> := #{<<"00000000-ffffffff">> := #{Node := []}}},
+        json(TSeqResVerify)
+    ),
+
+    ?assertEqual(Res2, changes(DbUrl, Params2), "Changes feeds still work 
after a reset").
+
+t_time_since_q8({_, DbUrl}) ->
+    Params1 = "?since=3000-02-03T04:05:00Z",
+    {Seq1, Pending1, Rows1} = changes(DbUrl, Params1),
+    ?assertEqual(8, Seq1),
+    ?assertEqual(0, Pending1),
+    ?assertEqual([], Rows1, "Far into the future, we should get nothing"),
+
+    Params2 = "?since=2025-01-01T04:05:00Z",
+    {Seq2, Pending2, Rows2} = changes(DbUrl, Params2),
+    {Seqs, Revs, _Deleted} = lists:unzip3(Rows2),
+    ?assertEqual(8, Seq2),
+    ?assertEqual(0, Pending2),
+    ?assertEqual(
+        [
+            {?DDOC2, <<"2-c">>},
+            {?DOC1, <<"2-c">>},
+            {?DOC3, <<"2-b">>}
+        ],
+        lists:sort(Revs),
+        "Before the feature is released, should get everything"
+    ),
+    ?assertEqual(Seqs, lists:sort(Seqs)).
+
 t_js_filter({_, DbUrl}) ->
     DDocId = "_design/filters",
     FilterFun = <<"function(doc, req) {return (doc._id == 'doc3')}">>,
diff --git a/src/fabric/src/fabric_view_changes.erl 
b/src/fabric/src/fabric_view_changes.erl
index f6695f163..73c05163d 100644
--- a/src/fabric/src/fabric_view_changes.erl
+++ b/src/fabric/src/fabric_view_changes.erl
@@ -27,13 +27,15 @@
 
 -import(fabric_db_update_listener, [wait_db_updated/1]).
 
+-define(RFC3339_TIME, [_, _, _, _, $-, _, _, $-, _, _, $T, _, _, $:, _, _, $:, 
_, _, $Z]).
+
 go(DbName, Feed, Options, Callback, Acc0) when
     Feed == "continuous" orelse
         Feed == "longpoll" orelse Feed == "eventsource"
 ->
     Args = make_changes_args(Options),
     Since = get_start_seq(DbName, Args),
-    case validate_start_seq(DbName, Since) of
+    case validate_start_seq(Since) of
         ok ->
             {ok, Acc} = Callback(start, Acc0),
             {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
@@ -69,7 +71,7 @@ go(DbName, Feed, Options, Callback, Acc0) when
 go(DbName, "normal", Options, Callback, Acc0) ->
     Args = make_changes_args(Options),
     Since = get_start_seq(DbName, Args),
-    case validate_start_seq(DbName, Since) of
+    case validate_start_seq(Since) of
         ok ->
             {ok, Acc} = Callback(start, Acc0),
             {ok, Collector} = send_changes(
@@ -369,6 +371,11 @@ get_start_seq(DbName, #changes_args{dir = Dir, since = 
Since}) when
 ->
     {ok, Info} = fabric:get_db_info(DbName),
     couch_util:get_value(update_seq, Info);
+get_start_seq(DbName, #changes_args{dir = fwd, since = ?RFC3339_TIME = Since}) 
->
+    case fabric:time_seq_since(DbName, Since) of
+        {ok, SinceSeq} -> SinceSeq;
+        {error, Error} -> {error, Error}
+    end;
 get_start_seq(_DbName, #changes_args{dir = fwd, since = Since}) ->
     Since.
 
@@ -728,11 +735,11 @@ make_split_seq({Num, Uuid, Node}, RepCount) when RepCount 
> 1 ->
 make_split_seq(Seq, _) ->
     Seq.
 
-validate_start_seq(_DbName, 0) ->
+validate_start_seq(0) ->
     ok;
-validate_start_seq(_DbName, "0") ->
+validate_start_seq("0") ->
     ok;
-validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) ->
+validate_start_seq(Seq) when is_list(Seq) orelse is_binary(Seq) ->
     try
         Opaque = unpack_seq_regex_match(Seq),
         unpack_seq_decode_term(Opaque),
@@ -741,7 +748,9 @@ validate_start_seq(_DbName, Seq) when is_list(Seq) orelse 
is_binary(Seq) ->
         _:_ ->
             Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
             {error, {bad_request, Reason}}
-    end.
+    end;
+validate_start_seq({error, Error}) ->
+    {error, {bad_request, Error}}.
 
 get_changes_epoch() ->
     case application:get_env(fabric, changes_epoch) of

Reply via email to