This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 2f843db2a fix(duplication): `min_checkpoint_decree` was reset to the
invalid decree after replica server was restarted (#2097)
2f843db2a is described below
commit 2f843db2a5dde325822bf423db7f848ec186a3d6
Author: Dan Wang <[email protected]>
AuthorDate: Mon Aug 19 19:54:20 2024 +0800
fix(duplication): `min_checkpoint_decree` was reset to the invalid decree
after replica server was restarted (#2097)
https://github.com/apache/incubator-pegasus/issues/2098
After the replica server was restarted, `min_checkpoint_decree` would be
reset to the invalid decree(-1), which is wrong for an existing duplication
that
is still in the status of `DS_PREPARE`, meaning the replica would continue
to generate checkpoints for the full duplication. The duplication in the
status
of `DS_PREPARE` would call `replica_duplicator::prepare_dup()` which makes
an assertion that `min_checkpoint_decree` must be greater than 0. Therefore,
the replica server would always exit abnormally due to the failed assertion.
---
src/replica/duplication/replica_duplicator.cpp | 106 +++++++++++++--------
src/replica/duplication/replica_duplicator.h | 4 +-
.../test/dup_replica_http_service_test.cpp | 3 +-
.../test/duplication_sync_timer_test.cpp | 36 ++++---
.../duplication/test/mutation_batch_test.cpp | 82 ++++++++--------
.../test/replica_duplicator_manager_test.cpp | 64 +++++++------
.../duplication/test/replica_duplicator_test.cpp | 31 ++++--
7 files changed, 194 insertions(+), 132 deletions(-)
diff --git a/src/replica/duplication/replica_duplicator.cpp
b/src/replica/duplication/replica_duplicator.cpp
index 6bb75b92c..d19033d4e 100644
--- a/src/replica/duplication/replica_duplicator.cpp
+++ b/src/replica/duplication/replica_duplicator.cpp
@@ -59,42 +59,66 @@ replica_duplicator::replica_duplicator(const
duplication_entry &ent, replica *r)
_stub(r->get_replica_stub()),
METRIC_VAR_INIT_replica(dup_confirmed_mutations)
{
+ // Ensure that the checkpoint decree is at least 1. Otherwise, the
checkpoint could not be
+ // created in time for empty replica; in consequence, the remote cluster
would inevitably
+ // fail to pull the checkpoint files.
+ //
+ // The max decree in rocksdb memtable (the last applied decree) is
considered as the min
+ // decree that should be covered by the checkpoint, which means currently
all of the data
+ // in current rocksdb should be included into the created checkpoint.
+ //
+ // `_min_checkpoint_decree` is not persisted into zk. Once replica server
was restarted,
+ // it would be reset to the decree that is applied most recently.
+ const auto last_applied_decree = _replica->last_applied_decree();
+ _min_checkpoint_decree = std::max(last_applied_decree,
static_cast<decree>(1));
+ LOG_INFO_PREFIX("initialize checkpoint decree: min_checkpoint_decree={}, "
+ "last_committed_decree={}, last_applied_decree={}, "
+ "last_flushed_decree={}, last_durable_decree={}, "
+ "plog_max_decree_on_disk={}, plog_max_commit_on_disk={}",
+ _min_checkpoint_decree,
+ _replica->last_committed_decree(),
+ last_applied_decree,
+ _replica->last_flushed_decree(),
+ _replica->last_durable_decree(),
+ _replica->private_log()->max_decree_on_disk(),
+ _replica->private_log()->max_commit_on_disk());
+
_status = ent.status;
- auto it = ent.progress.find(get_gpid().get_partition_index());
+ const auto it = ent.progress.find(get_gpid().get_partition_index());
+ CHECK_PREFIX_MSG(it != ent.progress.end(),
+ "partition({}) not found in duplication progress: "
+ "app_name={}, dup_id={}, remote_cluster_name={},
remote_app_name={}",
+ get_gpid(),
+ r->get_app_info()->app_name,
+ id(),
+ _remote_cluster_name,
+ _remote_app_name);
+
+ // Initial progress would be `invalid_decree` which was synced from meta
server
+ // immediately after the duplication was created.
+ // See `init_progress()` in
`meta_duplication_service::new_dup_from_init()`.
+ //
+ // _progress.last_decree would be used to update the state in meta server.
+ // See `replica_duplicator_manager::get_duplication_confirms_to_update()`.
if (it->second == invalid_decree) {
- // Ensure that the checkpoint decree is at least 1. Otherwise, the
checkpoint could not be
- // created in time for empty replica; in consequence, the remote
cluster would inevitably
- // fail to pull the checkpoint files.
- //
- // The max decree in rocksdb memtable (the last applied decree) is
considered as the min
- // decree that should be covered by the checkpoint, which means
currently all of the data
- // in current rocksdb should be included into the created checkpoint.
- //
- // TODO(jiashuo1): _min_checkpoint_decree hasn't be ready to persist
zk, so if master
- // restart, the value will be reset to 0.
- const auto last_applied_decree = _replica->last_applied_decree();
- _min_checkpoint_decree = std::max(last_applied_decree,
static_cast<decree>(1));
- _progress.last_decree = last_applied_decree;
- LOG_INFO_PREFIX("initialize checkpoint decree:
min_checkpoint_decree={}, "
- "last_committed_decree={}, last_applied_decree={}, "
- "last_flushed_decree={}, last_durable_decree={}, "
- "plog_max_decree_on_disk={},
plog_max_commit_on_disk={}",
- _min_checkpoint_decree,
- _replica->last_committed_decree(),
- last_applied_decree,
- _replica->last_flushed_decree(),
- _replica->last_durable_decree(),
- _replica->private_log()->max_decree_on_disk(),
- _replica->private_log()->max_commit_on_disk());
-
+ _progress.last_decree = _min_checkpoint_decree;
} else {
_progress.last_decree = _progress.confirmed_decree = it->second;
}
- LOG_INFO_PREFIX("initialize replica_duplicator[{}] [dupid:{},
meta_confirmed_decree:{}]",
- duplication_status_to_string(_status),
+
+ LOG_INFO_PREFIX("initialize replica_duplicator: app_name={}, dup_id={}, "
+ "remote_cluster_name={}, remote_app_name={}, status={}, "
+ "replica_confirmed_decree={}, meta_persisted_decree={}/{}",
+ r->get_app_info()->app_name,
id(),
- it->second);
+ _remote_cluster_name,
+ _remote_app_name,
+ duplication_status_to_string(_status),
+ _progress.last_decree,
+ it->second,
+ _progress.confirmed_decree);
+
thread_pool(LPC_REPLICATION_LOW).task_tracker(tracker()).thread_hash(get_gpid().thread_hash());
if (_status == duplication_status::DS_PREPARE) {
@@ -123,7 +147,8 @@ void replica_duplicator::prepare_dup()
void replica_duplicator::start_dup_log()
{
- LOG_INFO_PREFIX("starting duplication {} [last_decree: {},
confirmed_decree: {}]",
+ LOG_INFO_PREFIX("starting duplication: {}, replica_confirmed_decree={}, "
+ "meta_persisted_decree={}",
to_string(),
_progress.last_decree,
_progress.confirmed_decree);
@@ -261,17 +286,16 @@ error_s replica_duplicator::update_progress(const
duplication_progress &p)
void replica_duplicator::verify_start_decree(decree start_decree)
{
- decree confirmed_decree = progress().confirmed_decree;
- decree last_decree = progress().last_decree;
- decree max_gced_decree = get_max_gced_decree();
- CHECK_LT_MSG(max_gced_decree,
- start_decree,
- "the logs haven't yet duplicated were accidentally truncated "
- "[max_gced_decree: {}, start_decree: {}, confirmed_decree:
{}, last_decree: {}]",
- max_gced_decree,
- start_decree,
- confirmed_decree,
- last_decree);
+ const auto max_gced_decree = get_max_gced_decree();
+ CHECK_LT_PREFIX_MSG(
+ max_gced_decree,
+ start_decree,
+ "the logs haven't yet duplicated were accidentally truncated
[max_gced_decree: {}, "
+ "start_decree: {}, replica_confirmed_decree: {},
meta_persisted_decree: {}]",
+ max_gced_decree,
+ start_decree,
+ progress().last_decree,
+ progress().confirmed_decree);
}
decree replica_duplicator::get_max_gced_decree() const
diff --git a/src/replica/duplication/replica_duplicator.h
b/src/replica/duplication/replica_duplicator.h
index 9a8deed0d..1516ec520 100644
--- a/src/replica/duplication/replica_duplicator.h
+++ b/src/replica/duplication/replica_duplicator.h
@@ -157,8 +157,8 @@ public:
{
zauto_read_lock l(_lock);
- JSON_ENCODE_OBJ(writer, confirmed_decree, _progress.last_decree);
- JSON_ENCODE_OBJ(writer, persisted_decree,
_progress.confirmed_decree);
+ JSON_ENCODE_OBJ(writer, replica_confirmed_decree,
_progress.last_decree);
+ JSON_ENCODE_OBJ(writer, meta_persisted_decree,
_progress.confirmed_decree);
}
writer.EndObject();
diff --git a/src/replica/duplication/test/dup_replica_http_service_test.cpp
b/src/replica/duplication/test/dup_replica_http_service_test.cpp
index 43a0a3515..5fbffbd49 100644
--- a/src/replica/duplication/test/dup_replica_http_service_test.cpp
+++ b/src/replica/duplication/test/dup_replica_http_service_test.cpp
@@ -42,7 +42,8 @@ INSTANTIATE_TEST_SUITE_P(, dup_replica_http_service_test,
::testing::Values(fals
TEST_P(dup_replica_http_service_test, query_duplication_handler)
{
- auto pri = stub->add_primary_replica(1, 1);
+ auto *pri = stub->add_primary_replica(1, 1);
+ pri->init_private_log(pri->dir());
// primary confirmed_decree
duplication_entry ent;
diff --git a/src/replica/duplication/test/duplication_sync_timer_test.cpp
b/src/replica/duplication/test/duplication_sync_timer_test.cpp
index 0f7855ed7..2714af6bd 100644
--- a/src/replica/duplication/test/duplication_sync_timer_test.cpp
+++ b/src/replica/duplication/test/duplication_sync_timer_test.cpp
@@ -52,7 +52,8 @@ public:
static const std::string kTestRemoteAppName = "temp";
// replica: {app_id:2, partition_id:1, duplications:{}}
- stub->add_primary_replica(2, 1);
+ auto *rep = stub->add_primary_replica(2, 1);
+ rep->init_private_log(rep->dir());
ASSERT_NE(stub->find_replica(2, 1), nullptr);
// appid:2 -> dupid:1
@@ -82,15 +83,16 @@ public:
{
int total_app_num = 4;
for (int appid = 1; appid <= total_app_num; appid++) {
- auto r = stub->add_non_primary_replica(appid, 1);
+ auto *rep = stub->add_non_primary_replica(appid, 1);
+ rep->init_private_log(rep->dir());
// trigger duplication sync on partition 1
duplication_entry ent;
ent.dupid = 1;
- ent.progress[r->get_gpid().get_partition_index()] = 1000;
+ ent.progress[rep->get_gpid().get_partition_index()] = 1000;
ent.status = duplication_status::DS_PAUSE;
- auto dup = std::make_unique<replica_duplicator>(ent, r);
- add_dup(r, std::move(dup));
+ auto dup = std::make_unique<replica_duplicator>(ent, rep);
+ add_dup(rep, std::move(dup));
}
RPC_MOCKING(duplication_sync_rpc)
@@ -164,7 +166,8 @@ public:
std::map<int32_t, std::map<dupid_t, duplication_entry>> dup_map;
for (int32_t appid = 1; appid <= 10; appid++) {
for (int partition_id = 0; partition_id < 3; partition_id++) {
- stub->add_primary_replica(appid, partition_id);
+ auto *rep = stub->add_primary_replica(appid, partition_id);
+ rep->init_private_log(rep->dir());
}
}
@@ -254,19 +257,20 @@ public:
void test_update_confirmed_points()
{
for (int32_t appid = 1; appid <= 10; appid++) {
- stub->add_primary_replica(appid, 1);
+ auto *rep = stub->add_primary_replica(appid, 1);
+ rep->init_private_log(rep->dir());
}
for (int appid = 1; appid <= 3; appid++) {
- auto r = stub->find_replica(appid, 1);
+ auto *rep = stub->find_replica(appid, 1);
duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
- ent.progress[r->get_gpid().get_partition_index()] = 0;
- auto dup = std::make_unique<replica_duplicator>(ent, r);
+ ent.progress[rep->get_gpid().get_partition_index()] = 0;
+ auto dup = std::make_unique<replica_duplicator>(ent, rep);
dup->update_progress(dup->progress().set_last_decree(3).set_confirmed_decree(1));
- add_dup(r, std::move(dup));
+ add_dup(rep, std::move(dup));
}
duplication_entry ent;
@@ -280,8 +284,8 @@ public:
dup_sync->on_duplication_sync_reply(ERR_OK, resp);
for (int appid = 1; appid <= 3; appid++) {
- auto r = stub->find_replica(appid, 1);
- auto dup = find_dup(r, 1);
+ auto *rep = stub->find_replica(appid, 1);
+ auto *dup = find_dup(rep, 1);
ASSERT_EQ(3, dup->progress().confirmed_decree);
}
@@ -294,7 +298,8 @@ public:
// 10 primaries
int appid = 1;
for (int partition_id = 0; partition_id < 10; partition_id++) {
- stub->add_primary_replica(appid, partition_id);
+ auto *r = stub->add_primary_replica(appid, partition_id);
+ r->init_private_log(r->dir());
}
duplication_entry ent;
@@ -353,7 +358,8 @@ public:
// there must be some internal problems.
void test_receive_illegal_duplication_status()
{
- stub->add_primary_replica(1, 0);
+ auto *rep = stub->add_primary_replica(1, 0);
+ rep->init_private_log(rep->dir());
duplication_entry ent;
ent.dupid = 2;
diff --git a/src/replica/duplication/test/mutation_batch_test.cpp
b/src/replica/duplication/test/mutation_batch_test.cpp
index c865de20b..862e6276b 100644
--- a/src/replica/duplication/test/mutation_batch_test.cpp
+++ b/src/replica/duplication/test/mutation_batch_test.cpp
@@ -34,6 +34,7 @@
#include "replica/duplication/replica_duplicator.h"
#include "replica/mutation.h"
#include "replica/prepare_list.h"
+#include "replica/test/mock_utils.h"
#include "runtime/task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
@@ -42,24 +43,29 @@ namespace dsn::replication {
class mutation_batch_test : public duplication_test_base
{
-public:
- mutation_batch_test() : _duplicator(create_test_duplicator(0)),
_batcher(_duplicator.get()) {}
+protected:
+ mutation_batch_test()
+ {
+ _replica->init_private_log(_replica->dir());
+ _duplicator = create_test_duplicator(0);
+ _batcher = std::make_unique<mutation_batch>(_duplicator.get());
+ }
- void reset_buffer(const decree last_commit, const decree start, const
decree end)
+ void reset_buffer(const decree last_commit, const decree start, const
decree end) const
{
- _batcher._mutation_buffer->reset(last_commit);
- _batcher._mutation_buffer->_start_decree = start;
- _batcher._mutation_buffer->_end_decree = end;
+ _batcher->_mutation_buffer->reset(last_commit);
+ _batcher->_mutation_buffer->_start_decree = start;
+ _batcher->_mutation_buffer->_end_decree = end;
}
- void commit_buffer(const decree current_decree)
+ void commit_buffer(const decree current_decree) const
{
- _batcher._mutation_buffer->commit(current_decree,
COMMIT_TO_DECREE_HARD);
+ _batcher->_mutation_buffer->commit(current_decree,
COMMIT_TO_DECREE_HARD);
}
- void check_mutation_contents(const std::vector<std::string>
&expected_mutations)
+ void check_mutation_contents(const std::vector<std::string>
&expected_mutations) const
{
- const auto all_mutations = _batcher.move_all_mutations();
+ const auto all_mutations = _batcher->move_all_mutations();
std::vector<std::string> actual_mutations;
std::transform(all_mutations.begin(),
@@ -71,7 +77,7 @@ public:
}
std::unique_ptr<replica_duplicator> _duplicator;
- mutation_batch _batcher;
+ std::unique_ptr<mutation_batch> _batcher;
};
INSTANTIATE_TEST_SUITE_P(, mutation_batch_test, ::testing::Values(false,
true));
@@ -80,38 +86,38 @@ TEST_P(mutation_batch_test, prepare_mutation)
{
auto mu1 = create_test_mutation(1, 0, "first mutation");
set_last_applied_decree(1);
- ASSERT_TRUE(_batcher.add(mu1));
- ASSERT_EQ(1, _batcher.last_decree());
+ ASSERT_TRUE(_batcher->add(mu1));
+ ASSERT_EQ(1, _batcher->last_decree());
auto mu2 = create_test_mutation(2, 1, "abcde");
set_last_applied_decree(2);
- ASSERT_TRUE(_batcher.add(mu2));
- ASSERT_EQ(2, _batcher.last_decree());
+ ASSERT_TRUE(_batcher->add(mu2));
+ ASSERT_EQ(2, _batcher->last_decree());
auto mu3 = create_test_mutation(3, 2, "hello world");
- ASSERT_TRUE(_batcher.add(mu3));
+ ASSERT_TRUE(_batcher->add(mu3));
// The last decree has not been updated.
- ASSERT_EQ(2, _batcher.last_decree());
+ ASSERT_EQ(2, _batcher->last_decree());
auto mu4 = create_test_mutation(4, 2, "foo bar");
- ASSERT_TRUE(_batcher.add(mu4));
- ASSERT_EQ(2, _batcher.last_decree());
+ ASSERT_TRUE(_batcher->add(mu4));
+ ASSERT_EQ(2, _batcher->last_decree());
// The committed mutation would be ignored.
auto mu2_another = create_test_mutation(2, 1, "another second mutation");
- ASSERT_TRUE(_batcher.add(mu2_another));
- ASSERT_EQ(2, _batcher.last_decree());
+ ASSERT_TRUE(_batcher->add(mu2_another));
+ ASSERT_EQ(2, _batcher->last_decree());
// The mutation with duplicate decree would be ignored.
auto mu3_another = create_test_mutation(3, 2, "123 xyz");
- ASSERT_TRUE(_batcher.add(mu3_another));
- ASSERT_EQ(2, _batcher.last_decree());
+ ASSERT_TRUE(_batcher->add(mu3_another));
+ ASSERT_EQ(2, _batcher->last_decree());
auto mu5 = create_test_mutation(5, 2, "5th mutation");
set_last_applied_decree(5);
- ASSERT_TRUE(_batcher.add(mu5));
- ASSERT_EQ(5, _batcher.last_decree());
+ ASSERT_TRUE(_batcher->add(mu5));
+ ASSERT_EQ(5, _batcher->last_decree());
check_mutation_contents({"first mutation", "abcde", "hello world", "foo
bar", "5th mutation"});
}
@@ -119,7 +125,7 @@ TEST_P(mutation_batch_test, prepare_mutation)
TEST_P(mutation_batch_test, add_null_mutation)
{
auto mu = create_test_mutation(1, nullptr);
- _batcher.add_mutation_if_valid(mu, 0);
+ _batcher->add_mutation_if_valid(mu, 0);
check_mutation_contents({""});
}
@@ -127,7 +133,7 @@ TEST_P(mutation_batch_test, add_null_mutation)
TEST_P(mutation_batch_test, add_empty_mutation)
{
auto mu = create_test_mutation(1, "");
- _batcher.add_mutation_if_valid(mu, 0);
+ _batcher->add_mutation_if_valid(mu, 0);
check_mutation_contents({""});
}
@@ -138,7 +144,7 @@ TEST_P(mutation_batch_test, add_string_view_mutation)
auto mu = create_test_mutation(1, nullptr);
const std::string data("hello");
mu->data.updates.back().data = blob(data.data(), 0, data.size());
- _batcher.add_mutation_if_valid(mu, 0);
+ _batcher->add_mutation_if_valid(mu, 0);
check_mutation_contents({"hello"});
}
@@ -146,7 +152,7 @@ TEST_P(mutation_batch_test, add_string_view_mutation)
TEST_P(mutation_batch_test, add_a_valid_mutation)
{
auto mu = create_test_mutation(1, "hello");
- _batcher.add_mutation_if_valid(mu, 0);
+ _batcher->add_mutation_if_valid(mu, 0);
check_mutation_contents({"hello"});
}
@@ -156,13 +162,13 @@ TEST_P(mutation_batch_test, add_multiple_valid_mutations)
// The mutation could not be reused, since in
mutation_batch::add_mutation_if_valid
// the elements of mutation::data::updates would be moved and nullified.
auto mu1 = create_test_mutation(1, "hello");
- _batcher.add_mutation_if_valid(mu1, 0);
+ _batcher->add_mutation_if_valid(mu1, 0);
auto mu2 = create_test_mutation(2, "world");
- _batcher.add_mutation_if_valid(mu2, 2);
+ _batcher->add_mutation_if_valid(mu2, 2);
auto mu3 = create_test_mutation(3, "hi");
- _batcher.add_mutation_if_valid(mu3, 2);
+ _batcher->add_mutation_if_valid(mu3, 2);
check_mutation_contents({"hello", "world", "hi"});
}
@@ -170,17 +176,17 @@ TEST_P(mutation_batch_test, add_multiple_valid_mutations)
TEST_P(mutation_batch_test, add_invalid_mutation)
{
auto mu2 = create_test_mutation(2, "world");
- _batcher.add_mutation_if_valid(mu2, 2);
+ _batcher->add_mutation_if_valid(mu2, 2);
// mu1 would be ignored, since its decree is less than the start decree.
auto mu1 = create_test_mutation(1, "hello");
- _batcher.add_mutation_if_valid(mu1, 2);
+ _batcher->add_mutation_if_valid(mu1, 2);
auto mu3 = create_test_mutation(3, "hi");
- _batcher.add_mutation_if_valid(mu3, 2);
+ _batcher->add_mutation_if_valid(mu3, 2);
auto mu4 = create_test_mutation(1, "ok");
- _batcher.add_mutation_if_valid(mu4, 1);
+ _batcher->add_mutation_if_valid(mu4, 1);
// "ok" would be the first, since its timestamp (i.e. decree in
create_test_mutation)
// is the smallest.
@@ -191,7 +197,7 @@ TEST_P(mutation_batch_test, ignore_non_idempotent_write)
{
auto mu = create_test_mutation(1, "hello");
mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
- _batcher.add_mutation_if_valid(mu, 0);
+ _batcher->add_mutation_if_valid(mu, 0);
check_mutation_contents({});
}
@@ -202,7 +208,7 @@ TEST_P(mutation_batch_test, mutation_buffer_commit)
// details.
reset_buffer(10, 15, 20);
commit_buffer(15);
- ASSERT_EQ(14, _batcher.last_decree());
+ ASSERT_EQ(14, _batcher->last_decree());
}
} // namespace dsn::replication
diff --git a/src/replica/duplication/test/replica_duplicator_manager_test.cpp
b/src/replica/duplication/test/replica_duplicator_manager_test.cpp
index 8123cd623..1f6999757 100644
--- a/src/replica/duplication/test/replica_duplicator_manager_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_manager_test.cpp
@@ -44,14 +44,15 @@ public:
void test_remove_non_existed_duplications()
{
- auto r = stub->add_primary_replica(2, 1);
- auto &d = r->get_replica_duplicator_manager();
+ auto *rep = stub->add_primary_replica(2, 1);
+ rep->init_private_log(rep->dir());
+ auto &d = rep->get_replica_duplicator_manager();
duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
ent.remote = "dsn://slave-cluster";
- ent.progress[r->get_gpid().get_partition_index()] = 0;
+ ent.progress[rep->get_gpid().get_partition_index()] = 0;
d.sync_duplication(ent);
ASSERT_EQ(d._duplications.size(), 1);
@@ -66,20 +67,21 @@ public:
void test_set_confirmed_decree_non_primary()
{
- auto r = stub->add_primary_replica(2, 1);
- auto &d = r->get_replica_duplicator_manager();
+ auto *rep = stub->add_primary_replica(2, 1);
+ rep->init_private_log(rep->dir());
+ auto &d = rep->get_replica_duplicator_manager();
duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
ent.remote = "dsn://slave-cluster";
- ent.progress[r->get_gpid().get_partition_index()] = 100;
+ ent.progress[rep->get_gpid().get_partition_index()] = 100;
d.sync_duplication(ent);
ASSERT_EQ(d._duplications.size(), 1);
ASSERT_EQ(d._primary_confirmed_decree, invalid_decree);
// replica failover
- r->as_secondary();
+ rep->as_secondary();
d.update_confirmed_decree_if_secondary(99);
ASSERT_EQ(d._duplications.size(), 0);
@@ -103,7 +105,8 @@ public:
void test_get_duplication_confirms()
{
- auto r = stub->add_primary_replica(2, 1);
+ auto *rep = stub->add_primary_replica(2, 1);
+ rep->init_private_log(rep->dir());
int total_dup_num = 10;
int update_dup_num = 4; // the number of dups that will be updated
@@ -112,25 +115,25 @@ public:
duplication_entry ent;
ent.dupid = id;
ent.status = duplication_status::DS_PAUSE;
- ent.progress[r->get_gpid().get_partition_index()] = 0;
+ ent.progress[rep->get_gpid().get_partition_index()] = 0;
- auto dup = std::make_unique<replica_duplicator>(ent, r);
+ auto dup = std::make_unique<replica_duplicator>(ent, rep);
dup->update_progress(dup->progress().set_last_decree(2).set_confirmed_decree(1));
- add_dup(r, std::move(dup));
+ add_dup(rep, std::move(dup));
}
for (dupid_t id = update_dup_num + 1; id <= total_dup_num; id++) {
duplication_entry ent;
ent.dupid = id;
ent.status = duplication_status::DS_PAUSE;
- ent.progress[r->get_gpid().get_partition_index()] = 0;
+ ent.progress[rep->get_gpid().get_partition_index()] = 0;
- auto dup = std::make_unique<replica_duplicator>(ent, r);
+ auto dup = std::make_unique<replica_duplicator>(ent, rep);
dup->update_progress(dup->progress().set_last_decree(1).set_confirmed_decree(1));
- add_dup(r, std::move(dup));
+ add_dup(rep, std::move(dup));
}
- auto result =
r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
+ auto result =
rep->get_replica_duplicator_manager().get_duplication_confirms_to_update();
ASSERT_EQ(result.size(), update_dup_num);
}
@@ -142,24 +145,25 @@ public:
int64_t min_confirmed_decree;
};
- auto r = stub->add_non_primary_replica(2, 1);
- auto assert_test = [r, this](test_case tt) {
+ auto *rep = stub->add_non_primary_replica(2, 1);
+ rep->init_private_log(rep->dir());
+ auto assert_test = [rep, this](test_case tt) {
for (int id = 1; id <= tt.confirmed_decree.size(); id++) {
duplication_entry ent;
ent.dupid = id;
ent.status = duplication_status::DS_PAUSE;
- ent.progress[r->get_gpid().get_partition_index()] = 0;
+ ent.progress[rep->get_gpid().get_partition_index()] = 0;
- auto dup = std::make_unique<replica_duplicator>(ent, r);
+ auto dup = std::make_unique<replica_duplicator>(ent, rep);
dup->update_progress(dup->progress()
.set_last_decree(tt.confirmed_decree[id - 1])
.set_confirmed_decree(tt.confirmed_decree[id - 1]));
- add_dup(r, std::move(dup));
+ add_dup(rep, std::move(dup));
}
-
ASSERT_EQ(r->get_replica_duplicator_manager().min_confirmed_decree(),
+
ASSERT_EQ(rep->get_replica_duplicator_manager().min_confirmed_decree(),
tt.min_confirmed_decree);
- r->get_replica_duplicator_manager()._duplications.clear();
+ rep->get_replica_duplicator_manager()._duplications.clear();
};
{
@@ -169,7 +173,7 @@ public:
}
{ // primary
- r->as_primary();
+ rep->as_primary();
test_case tt{{1, 2, 3}, 1};
assert_test(tt);
@@ -203,17 +207,19 @@ TEST_P(replica_duplicator_manager_test,
min_confirmed_decree) { test_min_confirm
TEST_P(replica_duplicator_manager_test, update_checkpoint_prepared)
{
- auto r = stub->add_primary_replica(2, 1);
+ auto *rep = stub->add_primary_replica(2, 1);
+ rep->init_private_log(rep->dir());
+
duplication_entry ent;
ent.dupid = 1;
ent.status = duplication_status::DS_PAUSE;
- ent.progress[r->get_gpid().get_partition_index()] = 0;
+ ent.progress[rep->get_gpid().get_partition_index()] = 0;
- auto dup = std::make_unique<replica_duplicator>(ent, r);
- r->update_last_durable_decree(100);
+ auto dup = std::make_unique<replica_duplicator>(ent, rep);
+ rep->update_last_durable_decree(100);
dup->update_progress(dup->progress().set_last_decree(2).set_confirmed_decree(1));
- add_dup(r, std::move(dup));
- auto updates =
r->get_replica_duplicator_manager().get_duplication_confirms_to_update();
+ add_dup(rep, std::move(dup));
+ auto updates =
rep->get_replica_duplicator_manager().get_duplication_confirms_to_update();
for (const auto &update : updates) {
ASSERT_TRUE(update.checkpoint_prepared);
}
diff --git a/src/replica/duplication/test/replica_duplicator_test.cpp
b/src/replica/duplication/test/replica_duplicator_test.cpp
index 78e1aabfb..2942d8915 100644
--- a/src/replica/duplication/test/replica_duplicator_test.cpp
+++ b/src/replica/duplication/test/replica_duplicator_test.cpp
@@ -69,12 +69,13 @@ public:
return dup->_min_checkpoint_decree;
}
- void test_new_duplicator(const std::string &remote_app_name, bool
specify_remote_app_name)
+ void test_new_duplicator(const std::string &remote_app_name,
+ bool specify_remote_app_name,
+ int64_t confirmed_decree)
{
const dupid_t dupid = 1;
const std::string remote = "remote_address";
const duplication_status::type status = duplication_status::DS_PAUSE;
- const int64_t confirmed_decree = 100;
duplication_entry dup_ent;
dup_ent.dupid = dupid;
@@ -90,8 +91,13 @@ public:
ASSERT_EQ(remote, duplicator->remote_cluster_name());
ASSERT_EQ(remote_app_name, duplicator->remote_app_name());
ASSERT_EQ(status, duplicator->_status);
+ ASSERT_EQ(1, duplicator->_min_checkpoint_decree);
ASSERT_EQ(confirmed_decree, duplicator->progress().confirmed_decree);
- ASSERT_EQ(confirmed_decree, duplicator->progress().last_decree);
+ if (confirmed_decree == invalid_decree) {
+ ASSERT_EQ(1, duplicator->progress().last_decree);
+ } else {
+ ASSERT_EQ(confirmed_decree, duplicator->progress().last_decree);
+ }
auto &expected_env = *duplicator;
ASSERT_EQ(duplicator->tracker(), expected_env.__conf.tracker);
@@ -144,12 +150,25 @@ INSTANTIATE_TEST_SUITE_P(, replica_duplicator_test,
::testing::Values(false, tru
TEST_P(replica_duplicator_test, new_duplicator_without_remote_app_name)
{
- test_new_duplicator("temp", false);
+ test_new_duplicator("temp", false, 100);
}
TEST_P(replica_duplicator_test, new_duplicator_with_remote_app_name)
{
- test_new_duplicator("another_test_app", true);
+ test_new_duplicator("another_test_app", true, 100);
+}
+
+// Initial confirmed decree immediately after the duplication was created is
`invalid_decree`
+// which was synced from meta server.
+TEST_P(replica_duplicator_test, new_duplicator_with_initial_confirmed_decree)
+{
+ test_new_duplicator("test_initial_confirmed_decree", true, invalid_decree);
+}
+
+// The duplication progressed and confirmed decree became valid.
+TEST_P(replica_duplicator_test,
new_duplicator_with_non_initial_confirmed_decree)
+{
+ test_new_duplicator("test_non_initial_confirmed_decree", true, 1);
}
TEST_P(replica_duplicator_test, pause_start_duplication) {
test_pause_start_duplication(); }
@@ -160,7 +179,7 @@ TEST_P(replica_duplicator_test, duplication_progress)
// Start duplication from empty replica.
ASSERT_EQ(1, min_checkpoint_decree(duplicator));
- ASSERT_EQ(0, duplicator->progress().last_decree);
+ ASSERT_EQ(1, duplicator->progress().last_decree);
ASSERT_EQ(invalid_decree, duplicator->progress().confirmed_decree);
// Update the max decree that has been duplicated to the remote cluster.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]