This is an automated email from the ASF dual-hosted git repository. vatamane pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 1648781261153f686bca4ac7339c7cab5a7823a7 Author: Nick Vatamaniuc <vatam...@gmail.com> AuthorDate: Mon Jul 21 12:31:02 2025 -0400 Time-based since parameter for _changes Use the new time-seq feature to stream changes from before a point in time. This can be used for backups or any case when then it helps to associate a range of sequence updates to a time interval. The time-seq exponential decaying interval rules apply: the further back in time, the less accurate the time intervals will be. The API change consists in making `since` accept a standard time value and streaming the changes started right before that time value based on the known time-seq intervals. The time format of the since parameter is YYYY-MM-DDTHH:MM:SSZ. It's valid as either an ISO 8601 or an RFC 3339 format. From API design point of view this feature can be regarded as an extension to the other `since` values like `now` or `0`. Implementation-wise the change is treated similarly how we treat the `now` special value: before the changes request starts, we translate the time value to a proper `since` sequence. After that, we continue on with that regular sequence as if nothing special happened. Consequently, the shape of the emitted result is exactly the same as any previous change sequences. This is an extra "plus" for consistency and compatibility. To get a feel for the feature, I created a small db and updated it every few hours during the day: `http get $DB/db/_time_seq` ``` { "00000000-ffffffff": { "node1@127.0.0.1": [ ["2025-07-21T01:00:00Z", 15], ["2025-07-21T05:00:00Z", 2] ["2025-07-21T19:00:00Z", 9], ["2025-07-21T20:00:00Z", 5], ["2025-07-21T21:00:00Z", 70] ["2025-07-21T22:00:00Z", 10] ] } } ``` Change feed with `since=2025-07-21T22:00:00Z` will return documents changed since that last hour only: ``` % http get $DB/db/_changes'?since=2025-07-21T22:00:00Z' | jq -r '.results[].id' 101 102 103 104 105 106 107 108 109 110 ``` Even the somewhat obscure `since_seq` replication parameter should work, so we can replicate from a particular point in time: ``` % http post 'http://adm:pass@localhost:15984/_replicate' \ source:='"http://adm:pass@localhost:15984/db"' \ target:='"http://adm:pass@localhost:15984/tgt"' \ since_seq:='"2025-07-21T22:00:00Z"' { "history": [ { "bulk_get_attempts": 10, "bulk_get_docs": 10, "doc_write_failures": 0, "docs_read": 10, "docs_written": 10, "end_last_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews", "end_time": "Mon, 21 Jul 2025 22:11:59 GMT", "missing_checked": 10, "missing_found": 10, "recorded_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews", "session_id": "19252b97e34088aeaaa6cde6694a419f", "start_last_seq": "2025-07-21T22:00:00Z", "start_time": "Mon, 21 Jul 2025 22:11:55 GMT" } ], "ok": true, "replication_id_version": 4, "session_id": "19252b97e34088aeaaa6cde6694a419f", "source_last_seq": "111-g1AAAABLeJzLYWBgYMxgTmHgz8tPSTV0MDQy1zMAQsMcoARTIkMeC8N_IMjKYE7MzwUKsacaG6UYGSVhasgCALN1Ews" } ``` The target db now has only documents written in that last hour: ``` % http $DB/tgt/_all_docs | jq -r '.rows[].id' 101 102 103 104 105 106 107 108 109 110 ``` --- src/chttpd/src/chttpd_db.erl | 11 ++- src/chttpd/test/eunit/chttpd_changes_test.erl | 97 ++++++++++++++++++++++++++- src/fabric/src/fabric_view_changes.erl | 21 ++++-- 3 files changed, 115 insertions(+), 14 deletions(-) diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index a321d5c85..4bab083d4 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -872,18 +872,17 @@ db_req(#httpd{path_parts = [_, <<"_purged_infos_limit">>]} = Req, _Db) -> send_method_not_allowed(Req, "GET,PUT"); db_req(#httpd{method = 'GET', path_parts = [_, <<"_time_seq">>]} = Req, Db) -> Options = [{user_ctx, Req#httpd.user_ctx}], - case fabric:get_time_seq(Db, Options) of - {ok, #{} = RangeNodeToTSeq} -> + case fabric:time_seq_histogram(Db, Options) of + {ok, #{} = RangeNodeToHist} -> Props = maps:fold( fun([B, E], #{} = ByNode, Acc) -> Range = mem3_util:range_to_hex([B, E]), - MapF = fun(_, TSeq) -> couch_time_seq:histogram(TSeq) end, - [{Range, maps:map(MapF, ByNode)} | Acc] + [{Range, ByNode} | Acc] end, [], - RangeNodeToTSeq + RangeNodeToHist ), - send_json(Req, {lists:sort(Props)}); + send_json(Req, {[{<<"time_seq">>, {lists:sort(Props)}}]}); Error -> throw(Error) end; diff --git a/src/chttpd/test/eunit/chttpd_changes_test.erl b/src/chttpd/test/eunit/chttpd_changes_test.erl index b93667752..a89842ae3 100644 --- a/src/chttpd/test/eunit/chttpd_changes_test.erl +++ b/src/chttpd/test/eunit/chttpd_changes_test.erl @@ -83,7 +83,8 @@ changes_test_() -> ?TDEF(t_selector_filter), ?TDEF(t_design_filter), ?TDEF(t_docs_id_filter), - ?TDEF(t_docs_id_filter_over_limit) + ?TDEF(t_docs_id_filter_over_limit), + ?TDEF(t_time_since) ]) }. @@ -109,7 +110,8 @@ changes_q8_test_() -> ?TDEF(t_reverse_limit_one_q8), ?TDEF(t_selector_filter), ?TDEF(t_design_filter), - ?TDEF(t_docs_id_filter_q8) + ?TDEF(t_docs_id_filter_q8), + ?TDEF(t_time_since_q8) ]) }. @@ -493,6 +495,97 @@ t_docs_id_filter_over_limit({_, DbUrl}) -> Rows ). +t_time_since({_, DbUrl}) -> + Params1 = "?since=3000-02-03T04:05:00Z", + {Seq1, Pending1, Rows1} = changes(DbUrl, Params1), + ?assertEqual(8, Seq1), + ?assertEqual(0, Pending1), + ?assertEqual([], Rows1, "Far into the future, we should get nothing"), + + Params2 = "?since=2025-07-01T04:05:00Z", + Res2 = {Seq2, Pending2, Rows2} = changes(DbUrl, Params2), + ?assertEqual(8, Seq2), + ?assertEqual(0, Pending2), + ?assertEqual( + [ + {6, {?DOC1, <<"2-c">>}, ?LEAFREV}, + {7, {?DOC3, <<"2-b">>}, ?DELETED}, + {8, {?DDOC2, <<"2-c">>}, ?LEAFREV} + ], + Rows2, + "Before the feature is released, should same as since=0" + ), + + ?assertEqual( + Res2, + changes(DbUrl, "?since=0"), + "Expected the same result as from '?since=0'" + ), + + Params3 = "?since=2025-01-01Txx:yy:00Z", + {InvalCode, InvalRes} = reqraw(get, DbUrl ++ "/_changes" ++ Params3), + ?assertEqual(400, InvalCode), + ?assertMatch( + #{ + <<"error">> := <<"bad_request">>, + <<"reason">> := <<"invalid_time_format">> + }, + json(InvalRes), + "Invalid time" + ), + + {TSeqCode, TSeqRes} = reqraw(get, DbUrl ++ "/_time_seq"), + ?assertEqual(200, TSeqCode), + Year = integer_to_binary(element(1, date())), + Node = atom_to_binary(config:node_name()), + ?assertMatch( + #{ + <<"time_seq">> := #{ + <<"00000000-ffffffff">> := #{ + Node := [[<<Year:4/binary, _/binary>>, 8]] + } + } + }, + json(TSeqRes), + "Check we can get _time_seq via http API" + ), + + {TSeqResetCode, TSeqResetRes} = reqraw(delete, DbUrl ++ "/_time_seq"), + ?assertEqual(200, TSeqResetCode), + ?assertMatch(#{<<"ok">> := true}, json(TSeqResetRes), "Reset the time seq info"), + + {TSeqCodeVerify, TSeqResVerify} = reqraw(get, DbUrl ++ "/_time_seq"), + ?assertEqual(200, TSeqCodeVerify), + ?assertMatch( + #{<<"time_seq">> := #{<<"00000000-ffffffff">> := #{Node := []}}}, + json(TSeqResVerify) + ), + + ?assertEqual(Res2, changes(DbUrl, Params2), "Changes feeds still work after a reset"). + +t_time_since_q8({_, DbUrl}) -> + Params1 = "?since=3000-02-03T04:05:00Z", + {Seq1, Pending1, Rows1} = changes(DbUrl, Params1), + ?assertEqual(8, Seq1), + ?assertEqual(0, Pending1), + ?assertEqual([], Rows1, "Far into the future, we should get nothing"), + + Params2 = "?since=2025-01-01T04:05:00Z", + {Seq2, Pending2, Rows2} = changes(DbUrl, Params2), + {Seqs, Revs, _Deleted} = lists:unzip3(Rows2), + ?assertEqual(8, Seq2), + ?assertEqual(0, Pending2), + ?assertEqual( + [ + {?DDOC2, <<"2-c">>}, + {?DOC1, <<"2-c">>}, + {?DOC3, <<"2-b">>} + ], + lists:sort(Revs), + "Before the feature is released, should get everything" + ), + ?assertEqual(Seqs, lists:sort(Seqs)). + t_js_filter({_, DbUrl}) -> DDocId = "_design/filters", FilterFun = <<"function(doc, req) {return (doc._id == 'doc3')}">>, diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index f6695f163..73c05163d 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -27,13 +27,15 @@ -import(fabric_db_update_listener, [wait_db_updated/1]). +-define(RFC3339_TIME, [_, _, _, _, $-, _, _, $-, _, _, $T, _, _, $:, _, _, $:, _, _, $Z]). + go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Feed == "longpoll" orelse Feed == "eventsource" -> Args = make_changes_args(Options), Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of + case validate_start_seq(Since) of ok -> {ok, Acc} = Callback(start, Acc0), {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), @@ -69,7 +71,7 @@ go(DbName, Feed, Options, Callback, Acc0) when go(DbName, "normal", Options, Callback, Acc0) -> Args = make_changes_args(Options), Since = get_start_seq(DbName, Args), - case validate_start_seq(DbName, Since) of + case validate_start_seq(Since) of ok -> {ok, Acc} = Callback(start, Acc0), {ok, Collector} = send_changes( @@ -369,6 +371,11 @@ get_start_seq(DbName, #changes_args{dir = Dir, since = Since}) when -> {ok, Info} = fabric:get_db_info(DbName), couch_util:get_value(update_seq, Info); +get_start_seq(DbName, #changes_args{dir = fwd, since = ?RFC3339_TIME = Since}) -> + case fabric:time_seq_since(DbName, Since) of + {ok, SinceSeq} -> SinceSeq; + {error, Error} -> {error, Error} + end; get_start_seq(_DbName, #changes_args{dir = fwd, since = Since}) -> Since. @@ -728,11 +735,11 @@ make_split_seq({Num, Uuid, Node}, RepCount) when RepCount > 1 -> make_split_seq(Seq, _) -> Seq. -validate_start_seq(_DbName, 0) -> +validate_start_seq(0) -> ok; -validate_start_seq(_DbName, "0") -> +validate_start_seq("0") -> ok; -validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) -> +validate_start_seq(Seq) when is_list(Seq) orelse is_binary(Seq) -> try Opaque = unpack_seq_regex_match(Seq), unpack_seq_decode_term(Opaque), @@ -741,7 +748,9 @@ validate_start_seq(_DbName, Seq) when is_list(Seq) orelse is_binary(Seq) -> _:_ -> Reason = <<"Malformed sequence supplied in 'since' parameter.">>, {error, {bad_request, Reason}} - end. + end; +validate_start_seq({error, Error}) -> + {error, {bad_request, Error}}. get_changes_epoch() -> case application:get_env(fabric, changes_epoch) of