This is an automated email from the ASF dual-hosted git repository. jiahuili430 pushed a commit to branch improve-replication in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 36127dd88af04f28ca4934cd5b2cd2c30f1c09a2 Author: Jiahui Li <[email protected]> AuthorDate: Thu Feb 5 21:52:48 2026 -0600 Improve replication `since_seq` parameter --- .../src/couch_replicator_scheduler_job.erl | 9 +- .../eunit/couch_replicator_scheduler_job_tests.erl | 153 +++++++++++++++++++++ 2 files changed, 161 insertions(+), 1 deletion(-) diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl index c921543c3..36bfce930 100644 --- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl +++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl @@ -685,7 +685,14 @@ init_state(Rep) -> Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats), StartSeq1 = get_value(since_seq, Options, StartSeq0), - StartSeq = {0, StartSeq1}, + StartSeq2 = + case StartSeq0 > StartSeq1 of + true -> StartSeq0; + false -> StartSeq1 + end, + io:format("~n +++++++ StartSeq2:~p <- ~p:~p@~B~n", [StartSeq1, ?MODULE, ?FUNCTION_NAME, ?LINE]), + + StartSeq = {0, StartSeq2}, SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ), diff --git a/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl new file mode 100644 index 000000000..6b79bf944 --- /dev/null +++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl @@ -0,0 +1,153 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_replicator_scheduler_job_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(CHANGES_READER, couch_replicator_changes_reader). +-define(DOC1, #{<<"_id">> => <<"doc1">>}). +-define(DOC2, #{<<"_id">> => <<"doc2">>}). +-define(DOC3, #{<<"_id">> => <<"doc3">>}). +-define(DOC4, #{<<"_id">> => <<"doc4">>}). +-define(DOCS, #{<<"docs">> => [?DOC1, ?DOC2, ?DOC3]}). +-define(JSON, {"Content-Type", "application/json"}). + +setup_replicator_db(Prefix) -> + RepDb = + case Prefix of + <<>> -> <<"_replicator">>; + <<_/binary>> -> <<Prefix/binary, "/_replicator">> + end, + Opts = [{q, 1}, {n, 1}, ?ADMIN_CTX], + case fabric:create_db(RepDb, Opts) of + ok -> ok; + {error, file_exists} -> ok + end, + RepDb. + +setup_main_replicator_db() -> + {Ctx, {Source, Target}} = couch_replicator_test_helper:test_setup(), + RepDb = setup_replicator_db(<<>>), + meck:new(?CHANGES_READER, [passthrough]), + {Ctx, {RepDb, Source, Target}}. + +setup_prefixed_replicator_db() -> + {Ctx, {Source, Target}} = couch_replicator_test_helper:test_setup(), + RepDb = setup_replicator_db(?tempdb()), + meck:new(?CHANGES_READER, [passthrough]), + {Ctx, {RepDb, Source, Target}}. + +teardown({Ctx, {RepDb, Source, Target}}) -> + ok = fabric:delete_db(RepDb, [?ADMIN_CTX]), + config:delete("replicator", "update_docs", _Persist = false), + couch_replicator_test_helper:test_teardown({Ctx, {Source, Target}}). + +scheduler_job_replicate_test() -> + { + foreach, + fun setup_main_replicator_db/0, + fun teardown/1, + [ + ?TDEF_FE(t_replicate_without_since_seq), + ?TDEF_FE(t_replicate_with_since_seq) + ] + }. + +t_replicate_without_since_seq({_Ctx, {_RepDb, Source, Target}}) -> + ok = create_docs(Source), + replicate(Source, Target), + ?assertEqual(1, num_calls(read_changes, ['_', 0, '_', '_', '_'])), + + meck:reset(?CHANGES_READER), + ok = create_doc(Source), + replicate(Source, Target), + Changes = changes(Source), + Seq = sequence(?DOC3, Changes), + ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])). + +t_replicate_with_since_seq({_Ctx, {_RepDb, Source, Target}}) -> + ok = create_docs(Source), + Changes = changes(Source), + SinceSeq = sequence(?DOC2, Changes), + replicate(Source, Target, SinceSeq), + ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq, '_', '_', '_'])), + + meck:reset(?CHANGES_READER), + ok = create_doc(Source), + replicate(Source, Target), + Seq = sequence(?DOC3, Changes), + ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])). + +%%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%% +url(UrlPath) -> + binary_to_list(couch_replicator_test_helper:cluster_db_url(UrlPath)). + +create_docs(DbName) -> + case req(post, url(DbName) ++ "/_bulk_docs", ?DOCS) of + {201, _} -> ok; + Error -> error({failed_to_create_docs, DbName, Error}) + end. + +create_doc(DbName) -> + case req(post, url(DbName), ?DOC4) of + {201, _} -> ok; + Error -> error({failed_to_create_doc, DbName, Error}) + end. + +changes(DbName) -> + {200, Res} = req(get, url(DbName) ++ "/_changes"), + ?assert(maps:is_key(<<"last_seq">>, Res)), + ?assert(maps:is_key(<<"pending">>, Res)), + ?assert(maps:is_key(<<"results">>, Res)), + Res. + +sequence(Doc, Changes) -> + #{<<"_id">> := DocId} = Doc, + #{<<"results">> := Results} = Changes, + case lists:search(fun(M) -> maps:get(<<"id">>, M) == DocId end, Results) of + {value, #{<<"seq">> := Seq}} -> Seq; + false -> not_found + end. + +replicate(RepObject) -> + couch_replicator_test_helper:replicate(RepObject). + +replicate(Source, Target) -> + replicate(#{ + <<"source">> => ?l2b(url(Source)), + <<"target">> => ?l2b(url(Target)) + }). + +replicate(Source, Target, SinceSeq) -> + ?debugVal(SinceSeq), + replicate(#{ + <<"source">> => ?l2b(url(Source)), + <<"target">> => ?l2b(url(Target)), + <<"since_seq">> => SinceSeq + }). + +req(Method, Url) -> + Headers = [?JSON], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers), + {Code, jiffy:decode(Res, [return_maps])}. + +req(Method, Url, #{} = Body) -> + req(Method, Url, jiffy:encode(Body)); +req(Method, Url, Body) -> + Headers = [?JSON], + {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), + {Code, jiffy:decode(Res, [return_maps])}. + +num_calls(Fun, Args) -> + meck:num_calls(?CHANGES_READER, Fun, Args).
