This is an automated email from the ASF dual-hosted git repository.

jiahuili430 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/main by this push:
     new 437a586f7 Improve replication `since_seq` parameter
437a586f7 is described below

commit 437a586f7c5aa4b03663a886bff7201b5b86b8a5
Author: Jiahui Li <[email protected]>
AuthorDate: Thu Feb 5 21:52:48 2026 -0600

    Improve replication `since_seq` parameter
    
    - If there is no checkpoint and no `since_seq`, then replicate from scratch.
    - If there is no checkpoint but `since_seq` is defined, then replicate with
      the `since_seq` field.
    - If both checkpoint and `since_seq` exist, use the checkpoint to replicate.
    - If the request includes `since_seq` field, the replication ID will be 
changed.
---
 src/couch_replicator/src/couch_replicator_ids.erl  |  10 +-
 .../src/couch_replicator_scheduler_job.erl         |  11 +-
 .../couch_replicator_attachments_too_large.erl     |   4 +-
 .../eunit/couch_replicator_large_atts_tests.erl    |   2 +-
 .../eunit/couch_replicator_scheduler_job_tests.erl | 330 +++++++++++++++++++++
 .../test/eunit/couch_replicator_test_helper.erl    |   3 +-
 6 files changed, 354 insertions(+), 6 deletions(-)

diff --git a/src/couch_replicator/src/couch_replicator_ids.erl 
b/src/couch_replicator/src/couch_replicator_ids.erl
index 0c381b009..9916421e7 100644
--- a/src/couch_replicator/src/couch_replicator_ids.erl
+++ b/src/couch_replicator/src/couch_replicator_ids.erl
@@ -120,7 +120,15 @@ maybe_append_filters(
                 true ->
                     [<<"winning_revs_only">>]
             end,
-    couch_util:to_hex(couch_hash:md5_hash(?term_to_bin(Base3))).
+    Base4 =
+        Base3 ++
+            case couch_util:get_value(since_seq, Options) of
+                undefined ->
+                    [];
+                SinceSeq ->
+                    [SinceSeq]
+            end,
+    couch_util:to_hex(couch_hash:md5_hash(?term_to_bin(Base4))).
 
 maybe_append_options(Options, RepOptions) ->
     lists:foldl(
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl 
b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index c921543c3..55a28ad7e 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -684,7 +684,16 @@ init_state(Rep) ->
         end,
     Stats = couch_replicator_stats:max_stats(ArgStats1, HistoryStats),
 
-    StartSeq1 = get_value(since_seq, Options, StartSeq0),
+    StartSeq1 =
+        case StartSeq0 of
+            0 ->
+                % Checkpoint doesn't exist, use the `since_seq` to replicate;
+                % If `since_seq` is not defined, replicate from scratch.
+                get_value(since_seq, Options, 0);
+            _ ->
+                % Replicate with the checkpoint and ignore `since_seq`.
+                StartSeq0
+        end,
     StartSeq = {0, StartSeq1},
 
     SourceSeq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl 
b/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl
index 0b4360378..ef9390227 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_attachments_too_large.erl
@@ -32,13 +32,13 @@ attachment_too_large_replication_test_() ->
 should_succeed({_Ctx, {Source, Target}}) ->
     create_doc_with_attachment(Source, <<"doc">>, 1000),
     config:set("couchdb", "max_attachment_size", "1000", _Persist = false),
-    ok = replicate(Source, Target),
+    {ok, _} = replicate(Source, Target),
     ?assertEqual(ok, compare(Source, Target)).
 
 should_fail({_Ctx, {Source, Target}}) ->
     create_doc_with_attachment(Source, <<"doc">>, 1000),
     config:set("couchdb", "max_attachment_size", "999", _Persist = false),
-    ok = replicate(Source, Target),
+    {ok, _} = replicate(Source, Target),
     ?assertError({not_found, <<"doc">>}, compare(Source, Target)).
 
 create_doc_with_attachment(DbName, DocId, AttSize) ->
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl
index e60e3be5d..166b36c6c 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_large_atts_tests.erl
@@ -44,7 +44,7 @@ large_atts_test_() ->
 
 should_replicate_atts({_Ctx, {Source, Target}}) ->
     populate_db(Source, ?DOCS_COUNT),
-    ?assertEqual(ok, replicate(Source, Target)),
+    ?assertMatch({ok, _}, replicate(Source, Target)),
     couch_replicator_test_helper:cluster_compare_dbs(Source, Target).
 
 populate_db(DbName, DocCount) ->
diff --git 
a/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl 
b/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl
new file mode 100644
index 000000000..a3f477836
--- /dev/null
+++ b/src/couch_replicator/test/eunit/couch_replicator_scheduler_job_tests.erl
@@ -0,0 +1,330 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_scheduler_job_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-define(CHANGES_READER, couch_replicator_changes_reader).
+-define(DOC(Id), #{<<"_id">> => integer_to_binary(Id)}).
+-define(DOCS(StartId, StopId), #{
+    <<"docs">> =>
+        [
+            #{<<"_id">> => integer_to_binary(Id)}
+         || Id <- lists:seq(StartId, StopId)
+        ]
+}).
+-define(JSON, {"Content-Type", "application/json"}).
+
+setup_replicator_db(Prefix) ->
+    RepDb =
+        case Prefix of
+            <<>> -> <<"_replicator">>;
+            <<_/binary>> -> <<Prefix/binary, "/_replicator">>
+        end,
+    Opts = [{q, 1}, {n, 1}, ?ADMIN_CTX],
+    case fabric:create_db(RepDb, Opts) of
+        ok -> ok;
+        {error, file_exists} -> ok
+    end,
+    RepDb.
+
+setup_main_replicator_db() ->
+    {Ctx, {Source, Target}} = couch_replicator_test_helper:test_setup(),
+    RepDb = setup_replicator_db(<<>>),
+    meck:new(?CHANGES_READER, [passthrough]),
+    {Ctx, {RepDb, Source, Target}}.
+
+setup_prefixed_replicator_db() ->
+    {Ctx, {Source, Target}} = couch_replicator_test_helper:test_setup(),
+    RepDb = setup_replicator_db(?tempdb()),
+    meck:new(?CHANGES_READER, [passthrough]),
+    {Ctx, {RepDb, Source, Target}}.
+
+teardown({Ctx, {RepDb, Source, Target}}) ->
+    ok = fabric:delete_db(RepDb, [?ADMIN_CTX]),
+    config:delete("replicator", "update_docs", _Persist = false),
+    couch_replicator_test_helper:test_teardown({Ctx, {Source, Target}}).
+
+scheduler_job_replicate_test_() ->
+    {
+        foreach,
+        fun setup_main_replicator_db/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_replicate_without_since_seq),
+            ?TDEF_FE(t_replicate_with_since_seq_only),
+            ?TDEF_FE(t_replicate_with_checkpoint_and_since_seq)
+        ]
+    }.
+
+scheduler_job_main_db_test_() ->
+    {
+        foreach,
+        fun setup_main_replicator_db/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_replicator_without_since_seq, 15),
+            ?TDEF_FE(t_replicator_with_since_seq_only, 15),
+            ?TDEF_FE(t_replicator_with_checkpoint_and_since_seq, 25)
+        ]
+    }.
+
+scheduler_job_prefixed_db_test_() ->
+    {
+        foreach,
+        fun setup_prefixed_replicator_db/0,
+        fun teardown/1,
+        [
+            ?TDEF_FE(t_replicator_without_since_seq, 15),
+            ?TDEF_FE(t_replicator_with_since_seq_only, 15),
+            ?TDEF_FE(t_replicator_with_checkpoint_and_since_seq, 25)
+        ]
+    }.
+
+t_replicate_without_since_seq({_Ctx, {_RepDb, Source, Target}}) ->
+    ok = create_docs(Source, ?DOCS(1, 3)),
+    {ok, RepId1} = replicate(Source, Target),
+    ?assertEqual(1, num_calls(read_changes, ['_', 0, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 3}, all_docs(Target)),
+
+    meck:reset(?CHANGES_READER),
+    ok = create_doc(Source, ?DOC(4)),
+    {ok, RepId2} = replicate(Source, Target),
+    Changes = changes(Source),
+    Seq = sequence(?DOC(3), Changes),
+    ?assertEqual(RepId1, RepId2),
+    ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 4}, all_docs(Target)).
+
+t_replicate_with_since_seq_only({_Ctx, {_RepDb, Source, Target}}) ->
+    ok = create_docs(Source, ?DOCS(1, 3)),
+    Changes = changes(Source),
+    SinceSeq = sequence(?DOC(2), Changes),
+    replicate(Source, Target, SinceSeq),
+    ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 1}, all_docs(Target)).
+
+t_replicate_with_checkpoint_and_since_seq({_Ctx, {_RepDb, Source, Target}}) ->
+    ok = create_docs(Source, ?DOCS(1, 3)),
+    Changes = changes(Source),
+    SinceSeq = sequence(?DOC(2), Changes),
+    {ok, RepId1} = replicate(Source, Target, SinceSeq),
+    ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 1}, all_docs(Target)),
+
+    % Replicate with the checkpoint and ignore `since_seq`.
+    meck:reset(?CHANGES_READER),
+    ok = create_doc(Source, ?DOC(4)),
+    {ok, RepId2} = replicate(Source, Target, SinceSeq),
+    Seq = sequence(?DOC(3), Changes),
+    ?assertEqual(RepId1, RepId2),
+    ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 2}, all_docs(Target)),
+
+    % No checkpoint exist, so replicate with the `since_seq`.
+    meck:reset(?CHANGES_READER),
+    ok = create_docs(Source, ?DOCS(5, 7)),
+    Changes1 = changes(Source),
+    SinceSeq1 = sequence(?DOC(6), Changes1),
+    {ok, RepId3} = replicate(Source, Target, SinceSeq1),
+    ?assertNotEqual(RepId2, RepId3),
+    ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq1, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 3}, all_docs(Target)).
+
+t_replicator_without_since_seq({_Ctx, {RepDb, Source, Target}}) ->
+    ok = create_docs(Source, ?DOCS(1, 3)),
+    SourceUrl = couch_replicator_test_helper:cluster_db_url(Source),
+    TargetUrl = couch_replicator_test_helper:cluster_db_url(Target),
+    RepDoc = #{<<"source">> => SourceUrl, <<"target">> => TargetUrl},
+    {RepDocId, RepId1} = persistent_replicate(RepDb, RepDoc),
+    ?assertEqual(1, num_calls(read_changes, ['_', 0, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 3}, all_docs(Target)),
+    ?assertEqual(null, scheduler_docs_id(RepDb, RepDocId)),
+
+    meck:reset(?CHANGES_READER),
+    ok = create_doc(Source, ?DOC(4)),
+    {RepDocId2, RepId2} = persistent_replicate(RepDb, RepDoc),
+    Changes = changes(Source),
+    Seq = sequence(?DOC(3), Changes),
+    ?assertEqual(RepId1, RepId2),
+    ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 4}, all_docs(Target)),
+    ?assertEqual(null, scheduler_docs_id(RepDb, RepDocId2)).
+
+t_replicator_with_since_seq_only({_Ctx, {RepDb, Source, Target}}) ->
+    ok = create_docs(Source, ?DOCS(1, 3)),
+    Changes = changes(Source),
+    SinceSeq = sequence(?DOC(2), Changes),
+    SourceUrl = couch_replicator_test_helper:cluster_db_url(Source),
+    TargetUrl = couch_replicator_test_helper:cluster_db_url(Target),
+    RepDoc = #{<<"source">> => SourceUrl, <<"target">> => TargetUrl, 
<<"since_seq">> => SinceSeq},
+    {RepDocId, _} = persistent_replicate(RepDb, RepDoc),
+    ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 1}, all_docs(Target)),
+    ?assertEqual(null, scheduler_docs_id(RepDb, RepDocId)).
+
+t_replicator_with_checkpoint_and_since_seq({_Ctx, {RepDb, Source, Target}}) ->
+    ok = create_docs(Source, ?DOCS(1, 3)),
+    Changes = changes(Source),
+    SinceSeq = sequence(?DOC(2), Changes),
+    SourceUrl = couch_replicator_test_helper:cluster_db_url(Source),
+    TargetUrl = couch_replicator_test_helper:cluster_db_url(Target),
+    RepDoc = #{<<"source">> => SourceUrl, <<"target">> => TargetUrl, 
<<"since_seq">> => SinceSeq},
+    {RepDocId, RepId1} = persistent_replicate(RepDb, RepDoc),
+    ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 1}, all_docs(Target)),
+    ?assertEqual(null, scheduler_docs_id(RepDb, RepDocId)),
+
+    % Old replication: checkpoint exist, so replicate with the checkpoint.
+    meck:reset(?CHANGES_READER),
+    ok = create_doc(Source, ?DOC(4)),
+    {RepDocId2, RepId2} = persistent_replicate(RepDb, RepDoc),
+    Seq = sequence(?DOC(3), Changes),
+    ?assertEqual(RepId1, RepId2),
+    ?assertEqual(1, num_calls(read_changes, ['_', Seq, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 2}, all_docs(Target)),
+    ?assertEqual(null, scheduler_docs_id(RepDb, RepDocId2)),
+
+    % New replication: no checkpoint exist, so replicate with the `since_seq`.
+    meck:reset(?CHANGES_READER),
+    ok = create_docs(Source, ?DOCS(5, 7)),
+    Changes1 = changes(Source),
+    SinceSeq1 = sequence(?DOC(6), Changes1),
+    RepDoc1 = #{<<"source">> => SourceUrl, <<"target">> => TargetUrl, 
<<"since_seq">> => SinceSeq1},
+    {RepDocId3, RepId3} = persistent_replicate(RepDb, RepDoc1),
+    ?assertNotEqual(RepId2, RepId3),
+    ?assertEqual(1, num_calls(read_changes, ['_', SinceSeq1, '_', '_', '_'])),
+    ?assertMatch(#{<<"total_rows">> := 3}, all_docs(Target)),
+    ?assertEqual(null, scheduler_docs_id(RepDb, RepDocId3)).
+
+%%%%%%%%%%%%%%%%%%%% Utility Functions %%%%%%%%%%%%%%%%%%%%
+url(UrlPath) ->
+    binary_to_list(couch_replicator_test_helper:cluster_db_url(UrlPath)).
+
+create_docs(DbName, Docs) ->
+    case req(post, url(DbName) ++ "/_bulk_docs", Docs) of
+        {201, _} -> ok;
+        Error -> error({failed_to_create_docs, DbName, Error})
+    end.
+
+create_doc(DbName, Doc) ->
+    case req(post, url(DbName), Doc) of
+        {201, _} -> ok;
+        Error -> error({failed_to_create_doc, DbName, Error})
+    end.
+
+all_docs(DbName) ->
+    {200, Res} = req(get, url(DbName) ++ "/_all_docs"),
+    ?assert(maps:is_key(<<"offset">>, Res)),
+    ?assert(maps:is_key(<<"rows">>, Res)),
+    ?assert(maps:is_key(<<"total_rows">>, Res)),
+    Res.
+
+changes(DbName) ->
+    {200, Res} = req(get, url(DbName) ++ "/_changes"),
+    ?assert(maps:is_key(<<"last_seq">>, Res)),
+    ?assert(maps:is_key(<<"pending">>, Res)),
+    ?assert(maps:is_key(<<"results">>, Res)),
+    Res.
+
+sequence(Doc, Changes) ->
+    #{<<"_id">> := DocId} = Doc,
+    #{<<"results">> := Results} = Changes,
+    case lists:search(fun(M) -> maps:get(<<"id">>, M) == DocId end, Results) of
+        {value, #{<<"seq">> := Seq}} -> Seq;
+        false -> not_found
+    end.
+
+replicate(RepObject) ->
+    couch_replicator_test_helper:replicate(RepObject).
+
+replicate(Source, Target) ->
+    replicate(#{
+        <<"source">> => ?l2b(url(Source)),
+        <<"target">> => ?l2b(url(Target))
+    }).
+
+replicate(Source, Target, SinceSeq) ->
+    replicate(#{
+        <<"source">> => ?l2b(url(Source)),
+        <<"target">> => ?l2b(url(Target)),
+        <<"since_seq">> => SinceSeq
+    }).
+
+persistent_replicate(RepDb, RepDoc) ->
+    RepDocId = ?docid(),
+    rep_toggle(stop),
+    RepDocUrl = rep_doc_url(RepDb, RepDocId),
+    {201, _} = req(put, RepDocUrl, RepDoc),
+    RepId = scheduler_docs_id(RepDb, RepDocId),
+    rep_toggle(start),
+    ok = test_util:wait(
+        fun() ->
+            case req(get, RepDocUrl) of
+                {200, #{<<"_replication_state">> := <<"completed">>}} -> ok;
+                {_, #{}} -> wait
+            end
+        end,
+        7000,
+        1000
+    ),
+    {RepDocId, RepId}.
+
+rep_toggle(start) ->
+    config:set("replicator", "max_jobs", "500", false);
+rep_toggle(stop) ->
+    config:set("replicator", "max_jobs", "0", false).
+
+rep_doc_url(RepDb, DocId) when is_binary(RepDb) ->
+    rep_doc_url(binary_to_list(RepDb), DocId);
+rep_doc_url(RepDb, DocId) when is_binary(DocId) ->
+    rep_doc_url(RepDb, binary_to_list(DocId));
+rep_doc_url(RepDb, DocId) when is_list(RepDb), is_list(DocId) ->
+    UrlQuotedRepDb = mochiweb_util:quote_plus(RepDb),
+    url(UrlQuotedRepDb ++ "/" ++ DocId).
+
+scheduler_docs_id(RepDb, RepDocId) ->
+    RepDocIdBin = ?l2b(RepDocId),
+    SchedulerDocsUrl =
+        case RepDb of
+            <<"_replicator">> -> url(<<"/_scheduler/docs">>);
+            <<_/binary>> -> url(<<"/_scheduler/docs/", RepDb/binary>>)
+        end,
+    Docs = test_util:wait(
+        fun() ->
+            case req(get, SchedulerDocsUrl) of
+                {200, #{<<"docs">> := [_ | _] = Docs}} -> Docs;
+                {200, #{<<"docs">> := []}} -> wait
+            end
+        end,
+        7000,
+        1000
+    ),
+    [RepId] = [Id || #{<<"doc_id">> := DocId, <<"id">> := Id} <- Docs, DocId 
=:= RepDocIdBin],
+    RepId.
+
+req(Method, Url) ->
+    Headers = [?JSON],
+    {ok, Code, _, Res} = test_request:request(Method, Url, Headers),
+    {Code, jiffy:decode(Res, [return_maps])}.
+
+req(Method, Url, #{} = Body) ->
+    req(Method, Url, jiffy:encode(Body));
+req(Method, Url, Body) ->
+    Headers = [?JSON],
+    {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body),
+    {Code, jiffy:decode(Res, [return_maps])}.
+
+num_calls(Fun, Args) ->
+    meck:num_calls(?CHANGES_READER, Fun, Args).
diff --git a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl 
b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
index 5f2cfa25f..39f803cf5 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_test_helper.erl
@@ -186,7 +186,8 @@ replicate({[_ | _]} = RepObject) ->
         {'DOWN', MonRef, process, Pid, _} ->
             ok
     end,
-    ok = couch_replicator_scheduler:remove_job(Rep#rep.id).
+    ok = couch_replicator_scheduler:remove_job(Rep#rep.id),
+    {ok, Rep#rep.id}.
 
 setup_db() ->
     DbName = ?tempdb(),

Reply via email to