This is an automated email from the ASF dual-hosted git repository.
gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new aa3998b5f feat(duplication): allow remote replica count to be
specified for duplication (#1993)
aa3998b5f is described below
commit aa3998b5f422dd143a0b89538b25fe22edeee153
Author: Dan Wang <[email protected]>
AuthorDate: Sat May 11 15:52:50 2024 +0800
feat(duplication): allow remote replica count to be specified for
duplication (#1993)
While duplicating from a cluster A only including one node where all tables
have
just one replica to another cluster B including at least 3 nodes where all
tables have
3 replicas, the target table on B would be also created with one replica.
Thus we
should allow a duplication to be created with specified replica count.
---
idl/duplication.thrift | 27 ++++-
src/client/replication_ddl_client.cpp | 4 +-
src/client/replication_ddl_client.h | 3 +-
src/common/duplication_common.cpp | 5 +
src/meta/duplication/duplication_info.cpp | 9 ++
src/meta/duplication/duplication_info.h | 18 ++-
src/meta/duplication/meta_duplication_service.cpp | 52 +++++++--
src/meta/duplication/meta_duplication_service.h | 4 +-
src/meta/test/duplication_info_test.cpp | 39 ++++++-
src/meta/test/meta_duplication_service_test.cpp | 129 ++++++++++++++++++----
src/shell/command_helper.h | 7 +-
src/shell/commands/duplication.cpp | 30 ++---
src/shell/commands/local_partition_split.cpp | 4 +-
src/shell/main.cpp | 3 +-
14 files changed, 267 insertions(+), 67 deletions(-)
diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 682273b1f..140361ca9 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -70,6 +70,12 @@ struct duplication_add_request
// Since v2.6.0.
// Specify the app name of remote cluster.
4:optional string remote_app_name;
+
+ // Since v2.6.0.
+ // Specify the replica count of remote app.
+ // 0 means that the replica count of the remote app would be the same as
+ // the source app.
+ 5:optional i32 remote_replica_count;
}
struct duplication_add_response
@@ -84,13 +90,23 @@ struct duplication_add_response
// Since v2.6.0.
//
- // If new duplication is created, this would be its remote_app_name;
- // Otherwise, once the duplication has existed, this would be the
- // remote_app_name with which the duplication has been created.
+ // If new duplication is created, this would be requested remote_app_name
in
+ // duplication_add_request; otherwise, once the duplication has existed,
this
+ // would be the remote app name with which the duplication has been
created.
//
// This field could also be used to check if the meta server supports
// remote_app_name(i.e. the version of meta server must be >= v2.6.0).
5:optional string remote_app_name;
+
+ // Since v2.6.0.
+ //
+ // If new duplication is created, this would be requested
remote_replica_count in
+ // duplication_add_request; otherwise, once the duplication has existed,
this would
+ // be the remote replica count with which the duplication has been created.
+ //
+ // This field could also be used to check if the meta server supports
+ // remote_replica_count(i.e. the version of meta server must be >= v2.6.0).
+ 6:optional i32 remote_replica_count;
}
// This request is sent from client to meta.
@@ -129,6 +145,11 @@ struct duplication_entry
// For versions >= v2.6.0, this could be specified by client.
// For versions < v2.6.0, this must be the same with source app_name.
8:optional string remote_app_name;
+
+ // Since v2.6.0.
+ // For versions >= v2.6.0, this could be specified by client.
+ // For versions < v2.6.0, this must be the same with source replica_count.
+ 9:optional i32 remote_replica_count;
}
// This request is sent from client to meta.
diff --git a/src/client/replication_ddl_client.cpp
b/src/client/replication_ddl_client.cpp
index 3e8ba5979..a71241362 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -1362,7 +1362,8 @@ error_with<duplication_add_response>
replication_ddl_client::add_dup(const std::string &app_name,
const std::string &remote_cluster_name,
bool is_duplicating_checkpoint,
- const std::string &remote_app_name)
+ const std::string &remote_app_name,
+ const uint32_t remote_replica_count)
{
RETURN_EW_NOT_OK_MSG(validate_app_name(remote_app_name, false),
duplication_add_response,
@@ -1374,6 +1375,7 @@ replication_ddl_client::add_dup(const std::string
&app_name,
req->remote_cluster_name = remote_cluster_name;
req->is_duplicating_checkpoint = is_duplicating_checkpoint;
req->__set_remote_app_name(remote_app_name);
+
req->__set_remote_replica_count(static_cast<int32_t>(remote_replica_count));
return call_rpc_sync(duplication_add_rpc(std::move(req),
RPC_CM_ADD_DUPLICATION));
}
diff --git a/src/client/replication_ddl_client.h
b/src/client/replication_ddl_client.h
index 7907aa79a..69b3ae571 100644
--- a/src/client/replication_ddl_client.h
+++ b/src/client/replication_ddl_client.h
@@ -144,7 +144,8 @@ public:
error_with<duplication_add_response> add_dup(const std::string &app_name,
const std::string
&remote_address,
bool
is_duplicating_checkpoint,
- const std::string
&remote_app_name);
+ const std::string
&remote_app_name,
+ const uint32_t
remote_replica_count);
error_with<duplication_modify_response>
change_dup_status(const std::string &app_name, int dupid,
duplication_status::type status);
diff --git a/src/common/duplication_common.cpp
b/src/common/duplication_common.cpp
index 906b5eca5..6c6e1ad03 100644
--- a/src/common/duplication_common.cpp
+++ b/src/common/duplication_common.cpp
@@ -159,6 +159,11 @@ static nlohmann::json duplication_entry_to_json(const
duplication_entry &ent)
json["remote_app_name"] = ent.remote_app_name;
}
+ if (ent.__isset.remote_replica_count) {
+ // remote_replica_count is supported since v2.6.0, thus it won't be
shown before v2.6.0.
+ json["remote_replica_count"] = ent.remote_replica_count;
+ }
+
return json;
}
diff --git a/src/meta/duplication/duplication_info.cpp
b/src/meta/duplication/duplication_info.cpp
index 85a4413d1..4036cf293 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -190,6 +190,7 @@ blob duplication_info::to_json_blob() const
copy.status = _next_status;
copy.fail_mode = _next_fail_mode;
copy.remote_app_name = remote_app_name;
+ copy.remote_replica_count = remote_replica_count;
return json::json_forwarder<json_helper>::encode(copy);
}
@@ -206,6 +207,7 @@ duplication_info_s_ptr
duplication_info::decode_from_blob(dupid_t dup_id,
int32_t app_id,
const std::string
&app_name,
int32_t
partition_count,
+ int32_t
replica_count,
const std::string
&store_path,
const blob &json)
{
@@ -220,6 +222,12 @@ duplication_info_s_ptr
duplication_info::decode_from_blob(dupid_t dup_id,
info.remote_app_name = app_name;
}
+ if (info.remote_replica_count == 0) {
+ // remote_replica_count is missing, which means meta data in remote
storage(zk) is
+ // still of old version(< v2.6.0).
+ info.remote_replica_count = replica_count;
+ }
+
std::vector<host_port> meta_list;
if (!dsn::replication::replica_helper::load_servers_from_config(
duplication_constants::kClustersSectionName, info.remote,
meta_list)) {
@@ -230,6 +238,7 @@ duplication_info_s_ptr
duplication_info::decode_from_blob(dupid_t dup_id,
app_id,
app_name,
partition_count,
+ info.remote_replica_count,
info.create_timestamp_ms,
info.remote,
info.remote_app_name,
diff --git a/src/meta/duplication/duplication_info.h
b/src/meta/duplication/duplication_info.h
index e010eb4a7..feb750308 100644
--- a/src/meta/duplication/duplication_info.h
+++ b/src/meta/duplication/duplication_info.h
@@ -56,6 +56,7 @@ public:
int32_t app_id,
const std::string &app_name,
int32_t partition_count,
+ int32_t remote_replica_count,
uint64_t create_now_ms,
const std::string &remote_cluster_name,
const std::string &remote_app_name,
@@ -65,6 +66,7 @@ public:
app_id(app_id),
app_name(app_name),
partition_count(partition_count),
+ remote_replica_count(remote_replica_count),
remote_cluster_name(remote_cluster_name),
remote_app_name(remote_app_name),
remote_cluster_metas(std::move(remote_cluster_metas)),
@@ -139,6 +141,7 @@ public:
int32_t app_id,
const std::string &app_name,
int32_t partition_count,
+ int32_t replica_count,
const std::string
&store_path,
const blob &json);
@@ -156,6 +159,7 @@ public:
entry.status = _status;
entry.__set_fail_mode(_fail_mode);
entry.__set_remote_app_name(remote_app_name);
+ entry.__set_remote_replica_count(remote_replica_count);
entry.__isset.progress = true;
for (const auto &kv : _progress) {
if (!kv.second.is_inited) {
@@ -240,11 +244,14 @@ private:
int64_t create_timestamp_ms;
duplication_fail_mode::type fail_mode;
std::string remote_app_name;
-
- // Since there is no remote_cluster_name for old versions(< v2.6.0),
remote_app_name is
- // optional. Following deserialization functions could be compatible
with the situations
- // where remote_app_name is missing.
- DEFINE_JSON_SERIALIZATION(remote, status, create_timestamp_ms,
fail_mode, remote_app_name);
+ int32_t remote_replica_count{0};
+
+ // Since there is no remote_cluster_name for old versions(< v2.6.0),
remote_app_name
+ // and remote_replica_count are optional. Following deserialization
functions could
+ // be compatible with the situations where remote_app_name and
remote_replica_count
+ // are missing.
+ DEFINE_JSON_SERIALIZATION(
+ remote, status, create_timestamp_ms, fail_mode, remote_app_name,
remote_replica_count);
};
public:
@@ -252,6 +259,7 @@ public:
const int32_t app_id{0};
const std::string app_name;
const int32_t partition_count{0};
+ const int32_t remote_replica_count{0};
const std::string remote_cluster_name;
const std::string remote_app_name;
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index 14602ee7d..2673f5055 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -193,11 +193,15 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
remote_app_name = request.app_name;
}
+ int32_t remote_replica_count =
+ request.__isset.remote_replica_count ? request.remote_replica_count :
0;
+
LOG_INFO("add duplication for app({}), remote cluster name is {}, "
- "remote app name is {}",
+ "remote app name is {}, remote replica count is {}",
request.app_name,
request.remote_cluster_name,
- remote_app_name);
+ remote_app_name,
+ remote_replica_count);
LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(request.remote_cluster_name !=
get_current_cluster_name(),
@@ -223,6 +227,13 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
request.remote_cluster_name,
duplication_constants::kClustersSectionName);
+ LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
+ remote_replica_count >= 0,
+ response,
+ ERR_INVALID_PARAMETERS,
+ "invalid remote_replica_count({}) which should never be negative",
+ remote_replica_count);
+
std::shared_ptr<app_state> app;
duplication_info_s_ptr dup;
{
@@ -249,9 +260,16 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
}
}
+ if (remote_replica_count == 0) {
+ // 0 means that the replica count of the remote app would be the
same as the
+ // source app.
+ remote_replica_count = app->max_replica_count;
+ }
+
if (dup) {
// The duplication for the same app to the same remote cluster has
existed.
remote_app_name = dup->remote_app_name;
+ remote_replica_count = dup->remote_replica_count;
LOG_INFO("no need to add duplication, since it has existed:
app_name={}, "
"remote_cluster_name={}, remote_app_name={}",
request.app_name,
@@ -283,18 +301,22 @@ void
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
}
if (!dup) {
- dup = new_dup_from_init(
- request.remote_cluster_name, remote_app_name,
std::move(meta_list), app);
+ dup = new_dup_from_init(request.remote_cluster_name,
+ remote_app_name,
+ remote_replica_count,
+ std::move(meta_list),
+ app);
}
- do_add_duplication(app, dup, rpc, remote_app_name);
+ do_add_duplication(app, dup, rpc, remote_app_name, remote_replica_count);
}
// ThreadPool(WRITE): THREAD_POOL_META_STATE
void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state>
&app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc,
- const std::string
&remote_app_name)
+ const std::string
&remote_app_name,
+ const int32_t
remote_replica_count)
{
const auto &ec = dup->start(rpc.request().is_duplicating_checkpoint);
LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(ec == ERR_OK,
@@ -308,7 +330,9 @@ void
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
auto value = dup->to_json_blob();
std::queue<std::string> nodes({get_duplication_path(*app),
std::to_string(dup->id)});
_meta_svc->get_meta_storage()->create_node_recursively(
- std::move(nodes), std::move(value), [app, this, dup, rpc,
remote_app_name]() mutable {
+ std::move(nodes),
+ std::move(value),
+ [app, this, dup, rpc, remote_app_name, remote_replica_count]() mutable
{
LOG_INFO("[{}] add duplication successfully [app_name: {},
follower: {}]",
dup->log_prefix(),
app->app_name,
@@ -322,6 +346,7 @@ void
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
resp.appid = app->app_id;
resp.dupid = dup->id;
resp.__set_remote_app_name(remote_app_name);
+ resp.__set_remote_replica_count(remote_replica_count);
zauto_write_lock l(app_lock());
refresh_duplicating_no_lock(app);
@@ -434,7 +459,7 @@ void
meta_duplication_service::create_follower_app_for_duplication(
request.app_name = dup->remote_app_name;
request.options.app_type = app->app_type;
request.options.partition_count = app->partition_count;
- request.options.replica_count = app->max_replica_count;
+ request.options.replica_count = dup->remote_replica_count;
request.options.success_if_exist = false;
request.options.envs = app->envs;
request.options.is_stateful = app->is_stateful;
@@ -618,6 +643,7 @@ void
meta_duplication_service::do_update_partition_confirmed(
std::shared_ptr<duplication_info>
meta_duplication_service::new_dup_from_init(const std::string
&remote_cluster_name,
const std::string &remote_app_name,
+ const int32_t remote_replica_count,
std::vector<host_port>
&&remote_cluster_metas,
std::shared_ptr<app_state> &app)
const
{
@@ -637,6 +663,7 @@ meta_duplication_service::new_dup_from_init(const
std::string &remote_cluster_na
app->app_id,
app->app_name,
app->partition_count,
+ remote_replica_count,
dsn_now_ms(),
remote_cluster_name,
remote_app_name,
@@ -741,8 +768,13 @@ void
meta_duplication_service::do_restore_duplication(dupid_t dup_id,
[ dup_id, this, app = std::move(app), store_path ](const blob &json) {
zauto_write_lock l(app_lock());
- auto dup = duplication_info::decode_from_blob(
- dup_id, app->app_id, app->app_name, app->partition_count,
store_path, json);
+ auto dup = duplication_info::decode_from_blob(dup_id,
+ app->app_id,
+ app->app_name,
+ app->partition_count,
+
app->max_replica_count,
+ store_path,
+ json);
if (nullptr == dup) {
LOG_ERROR("failed to decode json \"{}\" on path {}", json,
store_path);
return; // fail fast
diff --git a/src/meta/duplication/meta_duplication_service.h
b/src/meta/duplication/meta_duplication_service.h
index adb1df7ea..2bcda880e 100644
--- a/src/meta/duplication/meta_duplication_service.h
+++ b/src/meta/duplication/meta_duplication_service.h
@@ -81,7 +81,8 @@ private:
void do_add_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
duplication_add_rpc &rpc,
- const std::string &remote_app_name);
+ const std::string &remote_app_name,
+ const int32_t remote_replica_count);
void do_modify_duplication(std::shared_ptr<app_state> &app,
duplication_info_s_ptr &dup,
@@ -124,6 +125,7 @@ private:
std::shared_ptr<duplication_info>
new_dup_from_init(const std::string &remote_cluster_name,
const std::string &remote_app_name,
+ const int32_t remote_replica_count,
std::vector<host_port> &&remote_cluster_metas,
std::shared_ptr<app_state> &app) const;
diff --git a/src/meta/test/duplication_info_test.cpp
b/src/meta/test/duplication_info_test.cpp
index 05f0dbcdd..5cfa4e12b 100644
--- a/src/meta/test/duplication_info_test.cpp
+++ b/src/meta/test/duplication_info_test.cpp
@@ -41,6 +41,7 @@ public:
static const std::string kTestRemoteClusterName;
static const std::string kTestRemoteAppName;
static const std::string kTestMetaStorePath;
+ static const int32_t kTestRemoteReplicaCount;
void force_update_status(duplication_info &dup, duplication_status::type
status)
{
@@ -54,6 +55,7 @@ public:
1,
kTestAppName,
2,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
@@ -108,6 +110,7 @@ public:
1,
kTestAppName,
4,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
@@ -120,6 +123,7 @@ public:
auto dup_ent = dup.to_duplication_entry();
ASSERT_EQ(0, dup_ent.progress.size());
ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
+ ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
for (int i = 0; i < 4; i++) {
dup.init_progress(i, invalid_decree);
@@ -140,6 +144,7 @@ public:
1,
kTestAppName,
4,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
@@ -160,6 +165,7 @@ public:
1,
kTestAppName,
4,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
@@ -169,7 +175,7 @@ public:
dup.persist_status();
dup.alter_status(duplication_status::DS_APP);
- auto json = dup.to_json_blob();
+ const auto &json = dup.to_json_blob();
dup.persist_status();
duplication_info::json_helper copy;
@@ -178,9 +184,12 @@ public:
ASSERT_EQ(dup.create_timestamp_ms, copy.create_timestamp_ms);
ASSERT_EQ(dup.remote_cluster_name, copy.remote);
ASSERT_EQ(dup.remote_app_name, copy.remote_app_name);
+ ASSERT_EQ(kTestRemoteAppName, copy.remote_app_name);
+ ASSERT_EQ(dup.remote_replica_count, copy.remote_replica_count);
+ ASSERT_EQ(kTestRemoteReplicaCount, copy.remote_replica_count);
- auto dup_sptr =
- duplication_info::decode_from_blob(1, 1, kTestAppName, 4,
kTestMetaStorePath, json);
+ auto dup_sptr = duplication_info::decode_from_blob(
+ 1, 1, kTestAppName, 4, kTestRemoteReplicaCount,
kTestMetaStorePath, json);
ASSERT_TRUE(dup_sptr->equals_to(dup)) << *dup_sptr << " " << dup;
blob new_json =
@@ -188,12 +197,31 @@ public:
ASSERT_FALSE(json::json_forwarder<duplication_info::json_helper>::decode(new_json,
copy));
ASSERT_EQ(duplication_status::DS_REMOVED, copy.status);
}
+
+ static void test_encode_and_decode_default()
+ {
+ dsn_run_config("config-test.ini", false);
+
+ duplication_info::json_helper copy;
+ copy.status = duplication_status::DS_INIT;
+ copy.fail_mode = duplication_fail_mode::FAIL_SLOW;
+ copy.remote = kTestRemoteClusterName;
+ ASSERT_TRUE(copy.remote_app_name.empty());
+ ASSERT_EQ(0, copy.remote_replica_count);
+
+ const auto json =
json::json_forwarder<duplication_info::json_helper>::encode(copy);
+ auto dup = duplication_info::decode_from_blob(
+ 1, 1, kTestAppName, 4, kTestRemoteReplicaCount,
kTestMetaStorePath, json);
+ ASSERT_EQ(kTestAppName, dup->remote_app_name);
+ ASSERT_EQ(kTestRemoteReplicaCount, dup->remote_replica_count);
+ }
};
const std::string duplication_info_test::kTestAppName = "temp";
const std::string duplication_info_test::kTestRemoteClusterName =
"slave-cluster";
const std::string duplication_info_test::kTestRemoteAppName = "remote_temp";
const std::string duplication_info_test::kTestMetaStorePath =
"/meta_test/101/duplication/1";
+const int32_t duplication_info_test::kTestRemoteReplicaCount = 3;
TEST_F(duplication_info_test, alter_status_when_busy)
{
@@ -201,6 +229,7 @@ TEST_F(duplication_info_test, alter_status_when_busy)
1,
kTestAppName,
4,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
@@ -274,6 +303,7 @@ TEST_F(duplication_info_test, alter_status)
1,
kTestAppName,
4,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
@@ -299,12 +329,15 @@ TEST_F(duplication_info_test, init_and_start) {
test_init_and_start(); }
TEST_F(duplication_info_test, encode_and_decode) { test_encode_and_decode(); }
+TEST_F(duplication_info_test, encode_and_decode_default) {
test_encode_and_decode_default(); }
+
TEST_F(duplication_info_test, is_valid)
{
duplication_info dup(1,
1,
kTestAppName,
4,
+ kTestRemoteReplicaCount,
0,
kTestRemoteClusterName,
kTestRemoteAppName,
diff --git a/src/meta/test/meta_duplication_service_test.cpp
b/src/meta/test/meta_duplication_service_test.cpp
index 5f7600312..78c0de661 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -73,17 +73,23 @@ public:
static const std::string kTestAppName;
static const std::string kTestRemoteClusterName;
static const std::string kTestRemoteAppName;
+ static const int32_t kTestRemoteReplicaCount;
meta_duplication_service_test() {}
duplication_add_response create_dup(const std::string &app_name,
const std::string &remote_cluster,
- const std::string &remote_app_name)
+ const bool specified,
+ const std::string &remote_app_name,
+ const int32_t remote_replica_count)
{
auto req = std::make_unique<duplication_add_request>();
req->app_name = app_name;
req->remote_cluster_name = remote_cluster;
- req->__set_remote_app_name(remote_app_name);
+ if (specified) {
+ req->__set_remote_app_name(remote_app_name);
+ req->__set_remote_replica_count(remote_replica_count);
+ }
duplication_add_rpc rpc(std::move(req), RPC_CM_ADD_DUPLICATION);
dup_svc().add_duplication(rpc);
@@ -92,14 +98,29 @@ public:
}
duplication_add_response create_dup(const std::string &app_name,
- const std::string &remote_cluster)
+ const std::string &remote_cluster,
+ const std::string &remote_app_name,
+ const int32_t remote_replica_count)
+ {
+ return create_dup(app_name, remote_cluster, true, remote_app_name,
remote_replica_count);
+ }
+
+ duplication_add_response create_dup_unspecified(const std::string
&app_name,
+ const std::string
&remote_cluster)
{
- return create_dup(app_name, remote_cluster, app_name);
+ return create_dup(app_name, remote_cluster, false, "", 0);
+ }
+
+ duplication_add_response create_dup(const std::string &app_name,
+ const std::string &remote_cluster,
+ const int32_t remote_replica_count)
+ {
+ return create_dup(app_name, remote_cluster, app_name,
remote_replica_count);
}
duplication_add_response create_dup(const std::string &app_name)
{
- return create_dup(app_name, kTestRemoteClusterName);
+ return create_dup(app_name, kTestRemoteClusterName,
kTestRemoteReplicaCount);
}
duplication_query_response query_dup_info(const std::string &app_name)
@@ -195,17 +216,17 @@ public:
int last_dup = 0;
for (int i = 0; i < 1000; i++) {
- auto dup =
- dup_svc().new_dup_from_init(kTestRemoteClusterName,
kTestRemoteAppName, {}, app);
+ auto dup = dup_svc().new_dup_from_init(
+ kTestRemoteClusterName, kTestRemoteAppName,
kTestRemoteReplicaCount, {}, app);
ASSERT_GT(dup->id, 0);
ASSERT_FALSE(dup->is_altering());
- ASSERT_EQ(dup->_status, duplication_status::DS_INIT);
- ASSERT_EQ(dup->_next_status, duplication_status::DS_INIT);
+ ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
+ ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);
auto ent = dup->to_duplication_entry();
for (int j = 0; j < app->partition_count; j++) {
- ASSERT_EQ(ent.progress[j], invalid_decree);
+ ASSERT_EQ(invalid_decree, ent.progress[j]);
}
if (last_dup != 0) {
@@ -327,7 +348,7 @@ public:
SetUp();
create_app(kTestAppName);
app = find_app(kTestAppName);
- auto test_dup = create_dup(kTestAppName, kTestRemoteClusterName);
+ auto test_dup = create_dup(kTestAppName, kTestRemoteClusterName,
kTestRemoteReplicaCount);
ASSERT_EQ(test_dup.err, ERR_OK);
duplication_info_s_ptr dup = app->duplications[test_dup.dupid];
_ms->get_meta_storage()->create_node(meta_duplication_service::get_partition_path(dup,
"0"),
@@ -348,7 +369,7 @@ public:
SetUp();
create_app(kTestAppName);
app = find_app(kTestAppName);
- test_dup = create_dup(kTestAppName, kTestRemoteClusterName);
+ test_dup = create_dup(kTestAppName, kTestRemoteClusterName,
kTestRemoteReplicaCount);
ASSERT_EQ(test_dup.err, ERR_OK);
dup = app->duplications[test_dup.dupid];
_ms->get_meta_storage()->create_node(meta_duplication_service::get_partition_path(dup,
"x"),
@@ -368,47 +389,100 @@ public:
{
static const std::string kTestSameAppName(kTestAppName + "_same");
static const std::string kTestAnotherAppName(kTestAppName +
"_another");
+ static const std::string kTestUnspecifiedAppName(kTestAppName +
"_unspecified");
create_app(kTestAppName);
create_app(kTestSameAppName);
create_app(kTestAnotherAppName);
+ create_app(kTestUnspecifiedAppName);
struct TestData
{
std::string app_name;
std::string remote;
+
+ bool specified;
std::string remote_app_name;
+ int32_t remote_replica_count;
error_code wec;
} tests[] = {
// The general case that duplicating to remote cluster with
specified remote_app_name.
- {kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, ERR_OK},
+ {kTestAppName,
+ kTestRemoteClusterName,
+ true,
+ kTestRemoteAppName,
+ kTestRemoteReplicaCount,
+ ERR_OK},
// A duplication that has been added would be found with its
original remote_app_name.
- {kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, ERR_OK},
+ {kTestAppName,
+ kTestRemoteClusterName,
+ true,
+ kTestRemoteAppName,
+ kTestRemoteReplicaCount,
+ ERR_OK},
// The general case that duplicating to remote cluster with same
remote_app_name.
- {kTestSameAppName, kTestRemoteClusterName, kTestSameAppName,
ERR_OK},
+ {kTestSameAppName,
+ kTestRemoteClusterName,
+ true,
+ kTestSameAppName,
+ kTestRemoteReplicaCount,
+ ERR_OK},
// It is not allowed that remote_cluster_name does not exist in
"duplication-group".
- {kTestAppName, "test-invalid-remote", kTestRemoteAppName,
ERR_INVALID_PARAMETERS},
+ {kTestAppName,
+ "test-invalid-remote",
+ true,
+ kTestRemoteAppName,
+ kTestRemoteReplicaCount,
+ ERR_INVALID_PARAMETERS},
// Duplicating to local cluster is not allowed.
- {kTestAppName, get_current_cluster_name(), kTestRemoteAppName,
ERR_INVALID_PARAMETERS},
+ {kTestAppName,
+ get_current_cluster_name(),
+ true,
+ kTestRemoteAppName,
+ kTestRemoteReplicaCount,
+ ERR_INVALID_PARAMETERS},
// It is not allowed that remote_cluster_name exists in
"duplication-group" but not
// exists in "pegasus.clusters".
{kTestAppName,
"cluster_without_address_for_test",
+ true,
kTestRemoteAppName,
+ kTestRemoteReplicaCount,
ERR_INVALID_PARAMETERS},
// The attempt that duplicates another app to the same remote app
would be blocked.
{kTestAnotherAppName,
kTestRemoteClusterName,
+ true,
kTestRemoteAppName,
+ kTestRemoteReplicaCount,
ERR_INVALID_PARAMETERS},
// The attempt that duplicates another app to the different remote
app would be
// ok.
- {kTestAnotherAppName, kTestRemoteClusterName, kTestAppName,
ERR_OK},
+ {kTestAnotherAppName,
+ kTestRemoteClusterName,
+ true,
+ kTestAppName,
+ kTestRemoteReplicaCount,
+ ERR_OK},
+ // Add a duplication without specifying remote_app_name and
remote_replica_count.
+ {kTestUnspecifiedAppName,
+ kTestRemoteClusterName,
+ false,
+ kTestUnspecifiedAppName,
+ 3,
+ ERR_OK},
};
for (auto test : tests) {
- auto resp = create_dup(test.app_name, test.remote,
test.remote_app_name);
+ duplication_add_response resp;
+ if (test.specified) {
+ resp = create_dup(
+ test.app_name, test.remote, test.remote_app_name,
test.remote_replica_count);
+ } else {
+ resp = create_dup_unspecified(test.app_name, test.remote);
+ }
+
ASSERT_EQ(test.wec, resp.err);
if (test.wec != ERR_OK) {
@@ -423,6 +497,8 @@ public:
ASSERT_EQ(test.remote, dup->remote_cluster_name);
ASSERT_EQ(test.remote_app_name, resp.remote_app_name);
ASSERT_EQ(test.remote_app_name, dup->remote_app_name);
+ ASSERT_EQ(test.remote_replica_count, resp.remote_replica_count);
+ ASSERT_EQ(test.remote_replica_count, dup->remote_replica_count);
ASSERT_EQ(resp.dupid, dup->id);
ASSERT_TRUE(app->duplicating);
}
@@ -432,6 +508,7 @@ public:
const std::string meta_duplication_service_test::kTestAppName = "test_app";
const std::string meta_duplication_service_test::kTestRemoteClusterName =
"slave-cluster";
const std::string meta_duplication_service_test::kTestRemoteAppName =
"remote_test_app";
+const int32_t meta_duplication_service_test::kTestRemoteReplicaCount = 3;
// This test ensures that duplication upon an unavailable app will
// be rejected with ERR_APP_NOT_EXIST.
@@ -494,14 +571,16 @@ TEST_F(meta_duplication_service_test,
dont_create_if_existed)
}
}
-TEST_F(meta_duplication_service_test, add_dup_with_remote_app_name)
+TEST_F(meta_duplication_service_test, add_dup_with_params)
{
create_app(kTestAppName);
auto app = find_app(kTestAppName);
ASSERT_EQ(kTestAppName, app->app_name);
- create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName);
- const dupid_t dupid = create_dup(kTestAppName).dupid;
+ const dupid_t dupid =
+ create_dup(
+ kTestAppName, kTestRemoteClusterName, kTestRemoteAppName,
kTestRemoteReplicaCount)
+ .dupid;
const auto &resp = query_dup_info(kTestAppName);
ASSERT_EQ(ERR_OK, resp.err);
@@ -511,6 +590,7 @@ TEST_F(meta_duplication_service_test,
add_dup_with_remote_app_name)
ASSERT_EQ(dupid, duplication_entry.dupid);
ASSERT_EQ(duplication_status::DS_PREPARE, duplication_entry.status);
ASSERT_EQ(kTestRemoteAppName, duplication_entry.remote_app_name);
+ ASSERT_EQ(kTestRemoteReplicaCount, duplication_entry.remote_replica_count);
}
TEST_F(meta_duplication_service_test, change_duplication_status)
@@ -754,7 +834,7 @@ TEST_F(meta_duplication_service_test,
recover_from_corrupted_meta_data)
TEST_F(meta_duplication_service_test, query_duplication_handler)
{
create_app(kTestAppName);
- create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName);
+ create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName,
kTestRemoteReplicaCount);
meta_http_service mhs(_ms.get());
http_request fake_req;
@@ -776,7 +856,8 @@ TEST_F(meta_duplication_service_test,
query_duplication_handler)
ASSERT_EQ(std::string() + R"({"1":{"create_ts":")" + ts_buf +
R"(","dupid":)" +
std::to_string(dup->id) +
R"(,"fail_mode":"FAIL_SLOW","remote":"slave-cluster")"
-
R"(,"remote_app_name":"remote_test_app","status":"DS_PREPARE"},"appid":2})",
+
R"(,"remote_app_name":"remote_test_app","remote_replica_count":3)"
+ R"(,"status":"DS_PREPARE"},"appid":2})",
fake_resp.body);
}
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index b0ad80515..70fc62119 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -925,9 +925,12 @@ private:
// A helper macro to parse an optional command argument, the result is filled
in an uint32_t
// variable 'value'.
-#define PARSE_OPT_UINT(name, value, def_val)
\
+//
+// Variable arguments are `name` or `init_list` of argh::parser::operator().
See argh::parser
+// for details.
+#define PARSE_OPT_UINT(value, def_val, ...)
\
do {
\
- const auto param = cmd(name, (def_val)).str();
\
+ const auto param = cmd(__VA_ARGS__, (def_val)).str();
\
if (!::dsn::buf2uint32(param, value)) {
\
fmt::print(stderr, "invalid command, '{}' should be an unsigned
integer\n", param); \
return false;
\
diff --git a/src/shell/commands/duplication.cpp
b/src/shell/commands/duplication.cpp
index f974ad078..097c1ec73 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -33,13 +33,13 @@
#include "duplication_types.h"
#include "shell/argh.h"
#include "shell/command_executor.h"
+#include "shell/command_helper.h"
#include "shell/commands.h"
#include "shell/sds/sds.h"
#include "utils/error_code.h"
#include "utils/errors.h"
#include "utils/fmt_logging.h"
#include "utils/output_utils.h"
-#include "utils/ports.h"
#include "utils/string_conv.h"
#include "utils/time_utils.h"
@@ -49,20 +49,14 @@ using dsn::replication::duplication_status;
bool add_dup(command_executor *e, shell_context *sc, arguments args)
{
// add_dup <app_name> <remote_cluster_name> [-s|--sst]
[-a|--remote_app_name str]
+ // [-r|--remote_replica_count num]
argh::parser cmd(args.argc, args.argv);
- if (cmd.pos_args().size() > 4) {
+ if (cmd.pos_args().size() > 5) {
fmt::print(stderr, "too many params\n");
return false;
}
- for (const auto &flag : cmd.flags()) {
- if (dsn_unlikely(flag != "s" && flag != "sst")) {
- fmt::print(stderr, "unknown flag {}\n", flag);
- return false;
- }
- }
-
if (!cmd(1)) {
fmt::print(stderr, "missing param <app_name>\n");
return false;
@@ -82,14 +76,22 @@ bool add_dup(command_executor *e, shell_context *sc,
arguments args)
}
// Check if the boolean option is specified.
- bool is_duplicating_checkpoint = cmd[{"-s", "--sst"}];
+ const auto is_duplicating_checkpoint = cmd[{"-s", "--sst"}];
// Read the app name of the remote cluster, if any.
// Otherwise, use app_name as the remote_app_name.
- std::string remote_app_name(cmd({"-a", "--remote_app_name"},
app_name).str());
-
- auto err_resp = sc->ddl_client->add_dup(
- app_name, remote_cluster_name, is_duplicating_checkpoint,
remote_app_name);
+ const std::string remote_app_name(cmd({"-a", "--remote_app_name"},
app_name).str());
+
+ // 0 represents that remote_replica_count is missing, which means the
replica count of
+ // the remote app would be the same as the source app.
+ uint32_t remote_replica_count = 0;
+ PARSE_OPT_UINT(remote_replica_count, 0, {"-r", "--remote_replica_count"});
+
+ auto err_resp = sc->ddl_client->add_dup(app_name,
+ remote_cluster_name,
+ is_duplicating_checkpoint,
+ remote_app_name,
+ remote_replica_count);
auto err = err_resp.get_error();
std::string hint;
if (err) {
diff --git a/src/shell/commands/local_partition_split.cpp
b/src/shell/commands/local_partition_split.cpp
index 7634a4f7c..6cc4726fa 100644
--- a/src/shell/commands/local_partition_split.cpp
+++ b/src/shell/commands/local_partition_split.cpp
@@ -697,8 +697,8 @@ bool local_partition_split(command_executor *e,
shell_context *sc, arguments arg
PARSE_UINT(lpsc.src_partition_count);
PARSE_UINT(lpsc.dst_partition_count);
lpsc.dst_app_name = cmd(param_index++).str();
- PARSE_OPT_UINT("threads_per_data_dir", lpsc.threads_per_data_dir, 1);
- PARSE_OPT_UINT("threads_per_partition", lpsc.threads_per_partition, 1);
+ PARSE_OPT_UINT(lpsc.threads_per_data_dir, 1, "threads_per_data_dir");
+ PARSE_OPT_UINT(lpsc.threads_per_partition, 1, "threads_per_partition");
lpsc.post_full_compact = cmd["--post_full_compact"];
lpsc.post_count = cmd["--post_count"];
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 4932a58fe..3b279824c 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -475,7 +475,8 @@ static command_executor commands[] = {
},
{"add_dup",
"add duplication",
- "<app_name> <remote_cluster_name> [-s|--sst] [-a|--remote_app_name str]",
+ "<app_name> <remote_cluster_name> [-s|--sst] [-a|--remote_app_name str] "
+ "[-r|--remote_replica_count num]",
add_dup},
{"query_dup", "query duplication info", "<app_name> [-d|--detail]",
query_dup},
{"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]