This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f843db2a fix(duplication): `min_checkpoint_decree` was reset to the 
invalid decree after replica server was restarted (#2097)
2f843db2a is described below

commit 2f843db2a5dde325822bf423db7f848ec186a3d6
Author: Dan Wang <[email protected]>
AuthorDate: Mon Aug 19 19:54:20 2024 +0800

    fix(duplication): `min_checkpoint_decree` was reset to the invalid decree 
after replica server was restarted (#2097)
    
    https://github.com/apache/incubator-pegasus/issues/2098
    
    After the replica server was restarted, `min_checkpoint_decree` would be
    reset to the invalid decree(-1), which is wrong for an existing duplication 
that
    is still in the status of `DS_PREPARE`, meaning the replica would continue
    to generate checkpoints for the full duplication. The duplication in the 
status
    of `DS_PREPARE` would call `replica_duplicator::prepare_dup()` which makes
    an assertion that `min_checkpoint_decree` must be greater than 0. Therefore,
    the replica server would always exit abnormally due to the failed assertion.
---
 src/replica/duplication/replica_duplicator.cpp     | 106 +++++++++++++--------
 src/replica/duplication/replica_duplicator.h       |   4 +-
 .../test/dup_replica_http_service_test.cpp         |   3 +-
 .../test/duplication_sync_timer_test.cpp           |  36 ++++---
 .../duplication/test/mutation_batch_test.cpp       |  82 ++++++++--------
 .../test/replica_duplicator_manager_test.cpp       |  64 +++++++------
 .../duplication/test/replica_duplicator_test.cpp   |  31 ++++--
 7 files changed, 194 insertions(+), 132 deletions(-)

diff --git a/src/replica/duplication/replica_duplicator.cpp 
b/src/replica/duplication/replica_duplicator.cpp
index 6bb75b92c..d19033d4e 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -59,42 +59,66 @@ replica_duplicator::replica_duplicator(const 
duplication_entry &ent, replica *r)
       _stub(r->get_replica_stub()),
       METRIC_VAR_INIT_replica(dup_confirmed_mutations)
 {
+    // Ensure that the checkpoint decree is at least 1. Otherwise, the 
checkpoint could not be
+    // created in time for empty replica; in consequence, the remote cluster 
would inevitably
+    // fail to pull the checkpoint files.
+    //
+    // The max decree in rocksdb memtable (the last applied decree) is 
considered as the min
+    // decree that should be covered by the checkpoint, which means currently 
all of the data
+    // in current rocksdb should be included into the created checkpoint.
+    //
+    // `_min_checkpoint_decree` is not persisted into zk. Once replica server 
was restarted,
+    // it would be reset to the decree that is applied most recently.
+    const auto last_applied_decree = _replica->last_applied_decree();
+    _min_checkpoint_decree = std::max(last_applied_decree, 
static_cast<decree>(1));
+    LOG_INFO_PREFIX("initialize checkpoint decree: min_checkpoint_decree={}, "
+                    "last_committed_decree={}, last_applied_decree={}, "
+                    "last_flushed_decree={}, last_durable_decree={}, "
+                    "plog_max_decree_on_disk={}, plog_max_commit_on_disk={}",
+                    _min_checkpoint_decree,
+                    _replica->last_committed_decree(),
+                    last_applied_decree,
+                    _replica->last_flushed_decree(),
+                    _replica->last_durable_decree(),
+                    _replica->private_log()->max_decree_on_disk(),
+                    _replica->private_log()->max_commit_on_disk());
+
     _status = ent.status;
 
-    auto it = ent.progress.find(get_gpid().get_partition_index());
+    const auto it = ent.progress.find(get_gpid().get_partition_index());
+    CHECK_PREFIX_MSG(it != ent.progress.end(),
+                     "partition({}) not found in duplication progress: "
+                     "app_name={}, dup_id={}, remote_cluster_name={}, 
remote_app_name={}",
+                     get_gpid(),
+                     r->get_app_info()->app_name,
+                     id(),
+                     _remote_cluster_name,
+                     _remote_app_name);
+
+    // Initial progress would be `invalid_decree` which was synced from meta 
server
+    // immediately after the duplication was created.
+    // See `init_progress()` in 
`meta_duplication_service::new_dup_from_init()`.
+    //
+    // _progress.last_decree would be used to update the state in meta server.
+    // See `replica_duplicator_manager::get_duplication_confirms_to_update()`.
     if (it->second == invalid_decree) {
-        // Ensure that the checkpoint decree is at least 1. Otherwise, the 
checkpoint could not be
-        // created in time for empty replica; in consequence, the remote 
cluster would inevitably
-        // fail to pull the checkpoint files.
-        //
-        // The max decree in rocksdb memtable (the last applied decree) is 
considered as the min
-        // decree that should be covered by the checkpoint, which means 
currently all of the data
-        // in current rocksdb should be included into the created checkpoint.
-        //
-        // TODO(jiashuo1): _min_checkpoint_decree hasn't be ready to persist 
zk, so if master
-        // restart, the value will be reset to 0.
-        const auto last_applied_decree = _replica->last_applied_decree();
-        _min_checkpoint_decree = std::max(last_applied_decree, 
static_cast<decree>(1));
-        _progress.last_decree = last_applied_decree;
-        LOG_INFO_PREFIX("initialize checkpoint decree: 
min_checkpoint_decree={}, "
-                        "last_committed_decree={}, last_applied_decree={}, "
-                        "last_flushed_decree={}, last_durable_decree={}, "
-                        "plog_max_decree_on_disk={}, 
plog_max_commit_on_disk={}",
-                        _min_checkpoint_decree,
-                        _replica->last_committed_decree(),
-                        last_applied_decree,
-                        _replica->last_flushed_decree(),
-                        _replica->last_durable_decree(),
-                        _replica->private_log()->max_decree_on_disk(),
-                        _replica->private_log()->max_commit_on_disk());
-
+        _progress.last_decree = _min_checkpoint_decree;
     } else {
         _progress.last_decree = _progress.confirmed_decree = it->second;
     }
-    LOG_INFO_PREFIX("initialize replica_duplicator[{}] [dupid:{}, 
meta_confirmed_decree:{}]",
-                    duplication_status_to_string(_status),
+
+    LOG_INFO_PREFIX("initialize replica_duplicator: app_name={}, dup_id={}, "
+                    "remote_cluster_name={}, remote_app_name={}, status={}, "
+                    "replica_confirmed_decree={}, meta_persisted_decree={}/{}",
+                    r->get_app_info()->app_name,
                     id(),
-                    it->second);
+                    _remote_cluster_name,
+                    _remote_app_name,
+                    duplication_status_to_string(_status),
+                    _progress.last_decree,
+                    it->second,
+                    _progress.confirmed_decree);
+
     
thread_pool(LPC_REPLICATION_LOW).task_tracker(tracker()).thread_hash(get_gpid().thread_hash());
 
     if (_status == duplication_status::DS_PREPARE) {
@@ -123,7 +147,8 @@ void replica_duplicator::prepare_dup()
 
 void replica_duplicator::start_dup_log()
 {
-    LOG_INFO_PREFIX("starting duplication {} [last_decree: {}, 
confirmed_decree: {}]",
+    LOG_INFO_PREFIX("starting duplication: {}, replica_confirmed_decree={}, "
+                    "meta_persisted_decree={}",
                     to_string(),
                     _progress.last_decree,
                     _progress.confirmed_decree);
@@ -261,17 +286,16 @@ error_s replica_duplicator::update_progress(const 
duplication_progress &p)
 
 void replica_duplicator::verify_start_decree(decree start_decree)
 {
-    decree confirmed_decree = progress().confirmed_decree;
-    decree last_decree = progress().last_decree;
-    decree max_gced_decree = get_max_gced_decree();
-    CHECK_LT_MSG(max_gced_decree,
-                 start_decree,
-                 "the logs haven't yet duplicated were accidentally truncated "
-                 "[max_gced_decree: {}, start_decree: {}, confirmed_decree: 
{}, last_decree: {}]",
-                 max_gced_decree,
-                 start_decree,
-                 confirmed_decree,
-                 last_decree);
+    const auto max_gced_decree = get_max_gced_decree();
+    CHECK_LT_PREFIX_MSG(
+        max_gced_decree,
+        start_decree,
+        "the logs haven't yet duplicated were accidentally truncated 
[max_gced_decree: {}, "
+        "start_decree: {}, replica_confirmed_decree: {}, 
meta_persisted_decree: {}]",
+        max_gced_decree,
+        start_decree,
+        progress().last_decree,
+        progress().confirmed_decree);
 }
 
 decree replica_duplicator::get_max_gced_decree() const
diff --git a/src/replica/duplication/replica_duplicator.h 
b/src/replica/duplication/replica_duplicator.h
index 9a8deed0d..1516ec520 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -157,8 +157,8 @@ public:
 
         {
             zauto_read_lock l(_lock);
-            JSON_ENCODE_OBJ(writer, confirmed_decree, _progress.last_decree);
-            JSON_ENCODE_OBJ(writer, persisted_decree, 
_progress.confirmed_decree);
+            JSON_ENCODE_OBJ(writer, replica_confirmed_decree, 
_progress.last_decree);
+            JSON_ENCODE_OBJ(writer, meta_persisted_decree, 
_progress.confirmed_decree);
         }
 
         writer.EndObject();
diff --git a/src/replica/duplication/test/dup_replica_http_service_test.cpp 
b/src/replica/duplication/test/dup_replica_http_service_test.cpp
index 43a0a3515..5fbffbd49 100644
--- a/src/replica/duplication/test/dup_replica_http_service_test.cpp
+++ b/src/replica/duplication/test/dup_replica_http_service_test.cpp
@@ -42,7 +42,8 @@ INSTANTIATE_TEST_SUITE_P(, dup_replica_http_service_test, 
::testing::Values(fals
 
 TEST_P(dup_replica_http_service_test, query_duplication_handler)
 {
-    auto pri = stub->add_primary_replica(1, 1);
+    auto *pri = stub->add_primary_replica(1, 1);
+    pri->init_private_log(pri->dir());
 
     // primary confirmed_decree
     duplication_entry ent;
diff --git a/src/replica/duplication/test/duplication_sync_timer_test.cpp 
b/src/replica/duplication/test/duplication_sync_timer_test.cpp
index 0f7855ed7..2714af6bd 100644
--- a/src/replica/duplication/test/duplication_sync_timer_test.cpp
+++ b/src/replica/duplication/test/duplication_sync_timer_test.cpp
@@ -52,7 +52,8 @@ public:
         static const std::string kTestRemoteAppName = "temp";
 
         // replica: {app_id:2, partition_id:1, duplications:{}}
-        stub->add_primary_replica(2, 1);
+        auto *rep = stub->add_primary_replica(2, 1);
+        rep->init_private_log(rep->dir());
         ASSERT_NE(stub->find_replica(2, 1), nullptr);
 
         // appid:2 -> dupid:1
@@ -82,15 +83,16 @@ public:
     {
         int total_app_num = 4;
         for (int appid = 1; appid <= total_app_num; appid++) {
-            auto r = stub->add_non_primary_replica(appid, 1);
+            auto *rep = stub->add_non_primary_replica(appid, 1);
+            rep->init_private_log(rep->dir());
 
             // trigger duplication sync on partition 1
             duplication_entry ent;
             ent.dupid = 1;
-            ent.progress[r->get_gpid().get_partition_index()] = 1000;
+            ent.progress[rep->get_gpid().get_partition_index()] = 1000;
             ent.status = duplication_status::DS_PAUSE;
-            auto dup = std::make_unique<replica_duplicator>(ent, r);
-            add_dup(r, std::move(dup));
+            auto dup = std::make_unique<replica_duplicator>(ent, rep);
+            add_dup(rep, std::move(dup));
         }
 
         RPC_MOCKING(duplication_sync_rpc)
@@ -164,7 +166,8 @@ public:
         std::map<int32_t, std::map<dupid_t, duplication_entry>> dup_map;
         for (int32_t appid = 1; appid <= 10; appid++) {
             for (int partition_id = 0; partition_id < 3; partition_id++) {
-                stub->add_primary_replica(appid, partition_id);
+                auto *rep = stub->add_primary_replica(appid, partition_id);
+                rep->init_private_log(rep->dir());
             }
         }
 
@@ -254,19 +257,20 @@ public:
     void test_update_confirmed_points()
     {
         for (int32_t appid = 1; appid <= 10; appid++) {
-            stub->add_primary_replica(appid, 1);
+            auto *rep = stub->add_primary_replica(appid, 1);
+            rep->init_private_log(rep->dir());
         }
 
         for (int appid = 1; appid <= 3; appid++) {
-            auto r = stub->find_replica(appid, 1);
+            auto *rep = stub->find_replica(appid, 1);
 
             duplication_entry ent;
             ent.dupid = 1;
             ent.status = duplication_status::DS_PAUSE;
-            ent.progress[r->get_gpid().get_partition_index()] = 0;
-            auto dup = std::make_unique<replica_duplicator>(ent, r);
+            ent.progress[rep->get_gpid().get_partition_index()] = 0;
+            auto dup = std::make_unique<replica_duplicator>(ent, rep);
             
dup->update_progress(dup->progress().set_last_decree(3).set_confirmed_decree(1));
-            add_dup(r, std::move(dup));
+            add_dup(rep, std::move(dup));
         }
 
         duplication_entry ent;
@@ -280,8 +284,8 @@ public:
         dup_sync->on_duplication_sync_reply(ERR_OK, resp);
 
         for (int appid = 1; appid <= 3; appid++) {
-            auto r = stub->find_replica(appid, 1);
-            auto dup = find_dup(r, 1);
+            auto *rep = stub->find_replica(appid, 1);
+            auto *dup = find_dup(rep, 1);
 
             ASSERT_EQ(3, dup->progress().confirmed_decree);
         }
@@ -294,7 +298,8 @@ public:
         // 10 primaries
         int appid = 1;
         for (int partition_id = 0; partition_id < 10; partition_id++) {
-            stub->add_primary_replica(appid, partition_id);
+            auto *r = stub->add_primary_replica(appid, partition_id);
+            r->init_private_log(r->dir());
         }
 
         duplication_entry ent;
@@ -353,7 +358,8 @@ public:
     // there must be some internal problems.
     void test_receive_illegal_duplication_status()
     {
-        stub->add_primary_replica(1, 0);
+        auto *rep = stub->add_primary_replica(1, 0);
+        rep->init_private_log(rep->dir());
 
         duplication_entry ent;
         ent.dupid = 2;
diff --git a/src/replica/duplication/test/mutation_batch_test.cpp 
b/src/replica/duplication/test/mutation_batch_test.cpp
index c865de20b..862e6276b 100644
--- a/src/replica/duplication/test/mutation_batch_test.cpp
+++ b/src/replica/duplication/test/mutation_batch_test.cpp
@@ -34,6 +34,7 @@
 #include "replica/duplication/replica_duplicator.h"
 #include "replica/mutation.h"
 #include "replica/prepare_list.h"
+#include "replica/test/mock_utils.h"
 #include "runtime/task/task_code.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
@@ -42,24 +43,29 @@ namespace dsn::replication {
 
 class mutation_batch_test : public duplication_test_base
 {
-public:
-    mutation_batch_test() : _duplicator(create_test_duplicator(0)), 
_batcher(_duplicator.get()) {}
+protected:
+    mutation_batch_test()
+    {
+        _replica->init_private_log(_replica->dir());
+        _duplicator = create_test_duplicator(0);
+        _batcher = std::make_unique<mutation_batch>(_duplicator.get());
+    }
 
-    void reset_buffer(const decree last_commit, const decree start, const 
decree end)
+    void reset_buffer(const decree last_commit, const decree start, const 
decree end) const
     {
-        _batcher._mutation_buffer->reset(last_commit);
-        _batcher._mutation_buffer->_start_decree = start;
-        _batcher._mutation_buffer->_end_decree = end;
+        _batcher->_mutation_buffer->reset(last_commit);
+        _batcher->_mutation_buffer->_start_decree = start;
+        _batcher->_mutation_buffer->_end_decree = end;
     }
 
-    void commit_buffer(const decree current_decree)
+    void commit_buffer(const decree current_decree) const
     {
-        _batcher._mutation_buffer->commit(current_decree, 
COMMIT_TO_DECREE_HARD);
+        _batcher->_mutation_buffer->commit(current_decree, 
COMMIT_TO_DECREE_HARD);
     }
 
-    void check_mutation_contents(const std::vector<std::string> 
&expected_mutations)
+    void check_mutation_contents(const std::vector<std::string> 
&expected_mutations) const
     {
-        const auto all_mutations = _batcher.move_all_mutations();
+        const auto all_mutations = _batcher->move_all_mutations();
 
         std::vector<std::string> actual_mutations;
         std::transform(all_mutations.begin(),
@@ -71,7 +77,7 @@ public:
     }
 
     std::unique_ptr<replica_duplicator> _duplicator;
-    mutation_batch _batcher;
+    std::unique_ptr<mutation_batch> _batcher;
 };
 
 INSTANTIATE_TEST_SUITE_P(, mutation_batch_test, ::testing::Values(false, 
true));
@@ -80,38 +86,38 @@ TEST_P(mutation_batch_test, prepare_mutation)
 {
     auto mu1 = create_test_mutation(1, 0, "first mutation");
     set_last_applied_decree(1);
-    ASSERT_TRUE(_batcher.add(mu1));
-    ASSERT_EQ(1, _batcher.last_decree());
+    ASSERT_TRUE(_batcher->add(mu1));
+    ASSERT_EQ(1, _batcher->last_decree());
 
     auto mu2 = create_test_mutation(2, 1, "abcde");
     set_last_applied_decree(2);
-    ASSERT_TRUE(_batcher.add(mu2));
-    ASSERT_EQ(2, _batcher.last_decree());
+    ASSERT_TRUE(_batcher->add(mu2));
+    ASSERT_EQ(2, _batcher->last_decree());
 
     auto mu3 = create_test_mutation(3, 2, "hello world");
-    ASSERT_TRUE(_batcher.add(mu3));
+    ASSERT_TRUE(_batcher->add(mu3));
 
     // The last decree has not been updated.
-    ASSERT_EQ(2, _batcher.last_decree());
+    ASSERT_EQ(2, _batcher->last_decree());
 
     auto mu4 = create_test_mutation(4, 2, "foo bar");
-    ASSERT_TRUE(_batcher.add(mu4));
-    ASSERT_EQ(2, _batcher.last_decree());
+    ASSERT_TRUE(_batcher->add(mu4));
+    ASSERT_EQ(2, _batcher->last_decree());
 
     // The committed mutation would be ignored.
     auto mu2_another = create_test_mutation(2, 1, "another second mutation");
-    ASSERT_TRUE(_batcher.add(mu2_another));
-    ASSERT_EQ(2, _batcher.last_decree());
+    ASSERT_TRUE(_batcher->add(mu2_another));
+    ASSERT_EQ(2, _batcher->last_decree());
 
     // The mutation with duplicate decree would be ignored.
     auto mu3_another = create_test_mutation(3, 2, "123 xyz");
-    ASSERT_TRUE(_batcher.add(mu3_another));
-    ASSERT_EQ(2, _batcher.last_decree());
+    ASSERT_TRUE(_batcher->add(mu3_another));
+    ASSERT_EQ(2, _batcher->last_decree());
 
     auto mu5 = create_test_mutation(5, 2, "5th mutation");
     set_last_applied_decree(5);
-    ASSERT_TRUE(_batcher.add(mu5));
-    ASSERT_EQ(5, _batcher.last_decree());
+    ASSERT_TRUE(_batcher->add(mu5));
+    ASSERT_EQ(5, _batcher->last_decree());
 
     check_mutation_contents({"first mutation", "abcde", "hello world", "foo 
bar", "5th mutation"});
 }
@@ -119,7 +125,7 @@ TEST_P(mutation_batch_test, prepare_mutation)
 TEST_P(mutation_batch_test, add_null_mutation)
 {
     auto mu = create_test_mutation(1, nullptr);
-    _batcher.add_mutation_if_valid(mu, 0);
+    _batcher->add_mutation_if_valid(mu, 0);
 
     check_mutation_contents({""});
 }
@@ -127,7 +133,7 @@ TEST_P(mutation_batch_test, add_null_mutation)
 TEST_P(mutation_batch_test, add_empty_mutation)
 {
     auto mu = create_test_mutation(1, "");
-    _batcher.add_mutation_if_valid(mu, 0);
+    _batcher->add_mutation_if_valid(mu, 0);
 
     check_mutation_contents({""});
 }
@@ -138,7 +144,7 @@ TEST_P(mutation_batch_test, add_string_view_mutation)
     auto mu = create_test_mutation(1, nullptr);
     const std::string data("hello");
     mu->data.updates.back().data = blob(data.data(), 0, data.size());
-    _batcher.add_mutation_if_valid(mu, 0);
+    _batcher->add_mutation_if_valid(mu, 0);
 
     check_mutation_contents({"hello"});
 }
@@ -146,7 +152,7 @@ TEST_P(mutation_batch_test, add_string_view_mutation)
 TEST_P(mutation_batch_test, add_a_valid_mutation)
 {
     auto mu = create_test_mutation(1, "hello");
-    _batcher.add_mutation_if_valid(mu, 0);
+    _batcher->add_mutation_if_valid(mu, 0);
 
     check_mutation_contents({"hello"});
 }
@@ -156,13 +162,13 @@ TEST_P(mutation_batch_test, add_multiple_valid_mutations)
     // The mutation could not be reused, since in 
mutation_batch::add_mutation_if_valid
     // the elements of mutation::data::updates would be moved and nullified.
     auto mu1 = create_test_mutation(1, "hello");
-    _batcher.add_mutation_if_valid(mu1, 0);
+    _batcher->add_mutation_if_valid(mu1, 0);
 
     auto mu2 = create_test_mutation(2, "world");
-    _batcher.add_mutation_if_valid(mu2, 2);
+    _batcher->add_mutation_if_valid(mu2, 2);
 
     auto mu3 = create_test_mutation(3, "hi");
-    _batcher.add_mutation_if_valid(mu3, 2);
+    _batcher->add_mutation_if_valid(mu3, 2);
 
     check_mutation_contents({"hello", "world", "hi"});
 }
@@ -170,17 +176,17 @@ TEST_P(mutation_batch_test, add_multiple_valid_mutations)
 TEST_P(mutation_batch_test, add_invalid_mutation)
 {
     auto mu2 = create_test_mutation(2, "world");
-    _batcher.add_mutation_if_valid(mu2, 2);
+    _batcher->add_mutation_if_valid(mu2, 2);
 
     // mu1 would be ignored, since its decree is less than the start decree.
     auto mu1 = create_test_mutation(1, "hello");
-    _batcher.add_mutation_if_valid(mu1, 2);
+    _batcher->add_mutation_if_valid(mu1, 2);
 
     auto mu3 = create_test_mutation(3, "hi");
-    _batcher.add_mutation_if_valid(mu3, 2);
+    _batcher->add_mutation_if_valid(mu3, 2);
 
     auto mu4 = create_test_mutation(1, "ok");
-    _batcher.add_mutation_if_valid(mu4, 1);
+    _batcher->add_mutation_if_valid(mu4, 1);
 
     // "ok" would be the first, since its timestamp (i.e. decree in 
create_test_mutation)
     // is the smallest.
@@ -191,7 +197,7 @@ TEST_P(mutation_batch_test, ignore_non_idempotent_write)
 {
     auto mu = create_test_mutation(1, "hello");
     mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
-    _batcher.add_mutation_if_valid(mu, 0);
+    _batcher->add_mutation_if_valid(mu, 0);
     check_mutation_contents({});
 }
 
@@ -202,7 +208,7 @@ TEST_P(mutation_batch_test, mutation_buffer_commit)
     // details.
     reset_buffer(10, 15, 20);
     commit_buffer(15);
-    ASSERT_EQ(14, _batcher.last_decree());
+    ASSERT_EQ(14, _batcher->last_decree());
 }
 
 } // namespace dsn::replication
diff --git a/src/replica/duplication/test/replica_duplicator_manager_test.cpp 
b/src/replica/duplication/test/replica_duplicator_manager_test.cpp
index 8123cd623..1f6999757 100644
--- a/src/replica/duplication/test/replica_duplicator_manager_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_manager_test.cpp
@@ -44,14 +44,15 @@ public:
 
     void test_remove_non_existed_duplications()
     {
-        auto r = stub->add_primary_replica(2, 1);
-        auto &d = r->get_replica_duplicator_manager();
+        auto *rep = stub->add_primary_replica(2, 1);
+        rep->init_private_log(rep->dir());
+        auto &d = rep->get_replica_duplicator_manager();
 
         duplication_entry ent;
         ent.dupid = 1;
         ent.status = duplication_status::DS_PAUSE;
         ent.remote = "dsn://slave-cluster";
-        ent.progress[r->get_gpid().get_partition_index()] = 0;
+        ent.progress[rep->get_gpid().get_partition_index()] = 0;
         d.sync_duplication(ent);
         ASSERT_EQ(d._duplications.size(), 1);
 
@@ -66,20 +67,21 @@ public:
 
     void test_set_confirmed_decree_non_primary()
     {
-        auto r = stub->add_primary_replica(2, 1);
-        auto &d = r->get_replica_duplicator_manager();
+        auto *rep = stub->add_primary_replica(2, 1);
+        rep->init_private_log(rep->dir());
+        auto &d = rep->get_replica_duplicator_manager();
 
         duplication_entry ent;
         ent.dupid = 1;
         ent.status = duplication_status::DS_PAUSE;
         ent.remote = "dsn://slave-cluster";
-        ent.progress[r->get_gpid().get_partition_index()] = 100;
+        ent.progress[rep->get_gpid().get_partition_index()] = 100;
         d.sync_duplication(ent);
         ASSERT_EQ(d._duplications.size(), 1);
         ASSERT_EQ(d._primary_confirmed_decree, invalid_decree);
 
         // replica failover
-        r->as_secondary();
+        rep->as_secondary();
 
         d.update_confirmed_decree_if_secondary(99);
         ASSERT_EQ(d._duplications.size(), 0);
@@ -103,7 +105,8 @@ public:
 
     void test_get_duplication_confirms()
     {
-        auto r = stub->add_primary_replica(2, 1);
+        auto *rep = stub->add_primary_replica(2, 1);
+        rep->init_private_log(rep->dir());
 
         int total_dup_num = 10;
         int update_dup_num = 4; // the number of dups that will be updated
@@ -112,25 +115,25 @@ public:
             duplication_entry ent;
             ent.dupid = id;
             ent.status = duplication_status::DS_PAUSE;
-            ent.progress[r->get_gpid().get_partition_index()] = 0;
+            ent.progress[rep->get_gpid().get_partition_index()] = 0;
 
-            auto dup = std::make_unique<replica_duplicator>(ent, r);
+            auto dup = std::make_unique<replica_duplicator>(ent, rep);
             
dup->update_progress(dup->progress().set_last_decree(2).set_confirmed_decree(1));
-            add_dup(r, std::move(dup));
+            add_dup(rep, std::move(dup));
         }
 
         for (dupid_t id = update_dup_num + 1; id <= total_dup_num; id++) {
             duplication_entry ent;
             ent.dupid = id;
             ent.status = duplication_status::DS_PAUSE;
-            ent.progress[r->get_gpid().get_partition_index()] = 0;
+            ent.progress[rep->get_gpid().get_partition_index()] = 0;
 
-            auto dup = std::make_unique<replica_duplicator>(ent, r);
+            auto dup = std::make_unique<replica_duplicator>(ent, rep);
             
dup->update_progress(dup->progress().set_last_decree(1).set_confirmed_decree(1));
-            add_dup(r, std::move(dup));
+            add_dup(rep, std::move(dup));
         }
 
-        auto result = 
r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
+        auto result = 
rep->get_replica_duplicator_manager().get_duplication_confirms_to_update();
         ASSERT_EQ(result.size(), update_dup_num);
     }
 
@@ -142,24 +145,25 @@ public:
             int64_t min_confirmed_decree;
         };
 
-        auto r = stub->add_non_primary_replica(2, 1);
-        auto assert_test = [r, this](test_case tt) {
+        auto *rep = stub->add_non_primary_replica(2, 1);
+        rep->init_private_log(rep->dir());
+        auto assert_test = [rep, this](test_case tt) {
             for (int id = 1; id <= tt.confirmed_decree.size(); id++) {
                 duplication_entry ent;
                 ent.dupid = id;
                 ent.status = duplication_status::DS_PAUSE;
-                ent.progress[r->get_gpid().get_partition_index()] = 0;
+                ent.progress[rep->get_gpid().get_partition_index()] = 0;
 
-                auto dup = std::make_unique<replica_duplicator>(ent, r);
+                auto dup = std::make_unique<replica_duplicator>(ent, rep);
                 dup->update_progress(dup->progress()
                                          
.set_last_decree(tt.confirmed_decree[id - 1])
                                          
.set_confirmed_decree(tt.confirmed_decree[id - 1]));
-                add_dup(r, std::move(dup));
+                add_dup(rep, std::move(dup));
             }
 
-            
ASSERT_EQ(r->get_replica_duplicator_manager().min_confirmed_decree(),
+            
ASSERT_EQ(rep->get_replica_duplicator_manager().min_confirmed_decree(),
                       tt.min_confirmed_decree);
-            r->get_replica_duplicator_manager()._duplications.clear();
+            rep->get_replica_duplicator_manager()._duplications.clear();
         };
 
         {
@@ -169,7 +173,7 @@ public:
         }
 
         { // primary
-            r->as_primary();
+            rep->as_primary();
             test_case tt{{1, 2, 3}, 1};
             assert_test(tt);
 
@@ -203,17 +207,19 @@ TEST_P(replica_duplicator_manager_test, 
min_confirmed_decree) { test_min_confirm
 
 TEST_P(replica_duplicator_manager_test, update_checkpoint_prepared)
 {
-    auto r = stub->add_primary_replica(2, 1);
+    auto *rep = stub->add_primary_replica(2, 1);
+    rep->init_private_log(rep->dir());
+
     duplication_entry ent;
     ent.dupid = 1;
     ent.status = duplication_status::DS_PAUSE;
-    ent.progress[r->get_gpid().get_partition_index()] = 0;
+    ent.progress[rep->get_gpid().get_partition_index()] = 0;
 
-    auto dup = std::make_unique<replica_duplicator>(ent, r);
-    r->update_last_durable_decree(100);
+    auto dup = std::make_unique<replica_duplicator>(ent, rep);
+    rep->update_last_durable_decree(100);
     
dup->update_progress(dup->progress().set_last_decree(2).set_confirmed_decree(1));
-    add_dup(r, std::move(dup));
-    auto updates = 
r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
+    add_dup(rep, std::move(dup));
+    auto updates = 
rep->get_replica_duplicator_manager().get_duplication_confirms_to_update();
     for (const auto &update : updates) {
         ASSERT_TRUE(update.checkpoint_prepared);
     }
diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp 
b/src/replica/duplication/test/replica_duplicator_test.cpp
index 78e1aabfb..2942d8915 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -69,12 +69,13 @@ public:
         return dup->_min_checkpoint_decree;
     }
 
-    void test_new_duplicator(const std::string &remote_app_name, bool 
specify_remote_app_name)
+    void test_new_duplicator(const std::string &remote_app_name,
+                             bool specify_remote_app_name,
+                             int64_t confirmed_decree)
     {
         const dupid_t dupid = 1;
         const std::string remote = "remote_address";
         const duplication_status::type status = duplication_status::DS_PAUSE;
-        const int64_t confirmed_decree = 100;
 
         duplication_entry dup_ent;
         dup_ent.dupid = dupid;
@@ -90,8 +91,13 @@ public:
         ASSERT_EQ(remote, duplicator->remote_cluster_name());
         ASSERT_EQ(remote_app_name, duplicator->remote_app_name());
         ASSERT_EQ(status, duplicator->_status);
+        ASSERT_EQ(1, duplicator->_min_checkpoint_decree);
         ASSERT_EQ(confirmed_decree, duplicator->progress().confirmed_decree);
-        ASSERT_EQ(confirmed_decree, duplicator->progress().last_decree);
+        if (confirmed_decree == invalid_decree) {
+            ASSERT_EQ(1, duplicator->progress().last_decree);
+        } else {
+            ASSERT_EQ(confirmed_decree, duplicator->progress().last_decree);
+        }
 
         auto &expected_env = *duplicator;
         ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
@@ -144,12 +150,25 @@ INSTANTIATE_TEST_SUITE_P(, replica_duplicator_test, 
::testing::Values(false, tru
 
 TEST_P(replica_duplicator_test, new_duplicator_without_remote_app_name)
 {
-    test_new_duplicator("temp", false);
+    test_new_duplicator("temp", false, 100);
 }
 
 TEST_P(replica_duplicator_test, new_duplicator_with_remote_app_name)
 {
-    test_new_duplicator("another_test_app", true);
+    test_new_duplicator("another_test_app", true, 100);
+}
+
+// Initial confirmed decree immediately after the duplication was created is 
`invalid_decree`
+// which was synced from meta server.
+TEST_P(replica_duplicator_test, new_duplicator_with_initial_confirmed_decree)
+{
+    test_new_duplicator("test_initial_confirmed_decree", true, invalid_decree);
+}
+
+// The duplication progressed and confirmed decree became valid.
+TEST_P(replica_duplicator_test, 
new_duplicator_with_non_initial_confirmed_decree)
+{
+    test_new_duplicator("test_non_initial_confirmed_decree", true, 1);
 }
 
 TEST_P(replica_duplicator_test, pause_start_duplication) { 
test_pause_start_duplication(); }
@@ -160,7 +179,7 @@ TEST_P(replica_duplicator_test, duplication_progress)
 
     // Start duplication from empty replica.
     ASSERT_EQ(1, min_checkpoint_decree(duplicator));
-    ASSERT_EQ(0, duplicator->progress().last_decree);
+    ASSERT_EQ(1, duplicator->progress().last_decree);
     ASSERT_EQ(invalid_decree, duplicator->progress().confirmed_decree);
 
     // Update the max decree that has been duplicated to the remote cluster.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to