This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new aa3998b5f feat(duplication): allow remote replica count to be 
specified for duplication (#1993)
aa3998b5f is described below

commit aa3998b5f422dd143a0b89538b25fe22edeee153
Author: Dan Wang <[email protected]>
AuthorDate: Sat May 11 15:52:50 2024 +0800

    feat(duplication): allow remote replica count to be specified for 
duplication (#1993)
    
    While duplicating from a cluster A only including one node where all tables 
have
    just one replica to another cluster B including at least 3 nodes where all 
tables have
    3 replicas, the target table on B would be also created with one replica. 
Thus we
    should allow a duplication to be created with specified replica count.
---
 idl/duplication.thrift                            |  27 ++++-
 src/client/replication_ddl_client.cpp             |   4 +-
 src/client/replication_ddl_client.h               |   3 +-
 src/common/duplication_common.cpp                 |   5 +
 src/meta/duplication/duplication_info.cpp         |   9 ++
 src/meta/duplication/duplication_info.h           |  18 ++-
 src/meta/duplication/meta_duplication_service.cpp |  52 +++++++--
 src/meta/duplication/meta_duplication_service.h   |   4 +-
 src/meta/test/duplication_info_test.cpp           |  39 ++++++-
 src/meta/test/meta_duplication_service_test.cpp   | 129 ++++++++++++++++++----
 src/shell/command_helper.h                        |   7 +-
 src/shell/commands/duplication.cpp                |  30 ++---
 src/shell/commands/local_partition_split.cpp      |   4 +-
 src/shell/main.cpp                                |   3 +-
 14 files changed, 267 insertions(+), 67 deletions(-)

diff --git a/idl/duplication.thrift b/idl/duplication.thrift
index 682273b1f..140361ca9 100644
--- a/idl/duplication.thrift
+++ b/idl/duplication.thrift
@@ -70,6 +70,12 @@ struct duplication_add_request
     // Since v2.6.0.
     // Specify the app name of remote cluster.
     4:optional string remote_app_name;
+
+    // Since v2.6.0.
+    // Specify the replica count of remote app.
+    // 0 means that the replica count of the remote app would be the same as
+    // the source app.
+    5:optional i32 remote_replica_count;
 }
 
 struct duplication_add_response
@@ -84,13 +90,23 @@ struct duplication_add_response
 
     // Since v2.6.0.
     //
-    // If new duplication is created, this would be its remote_app_name;
-    // Otherwise, once the duplication has existed, this would be the
-    // remote_app_name with which the duplication has been created.
+    // If new duplication is created, this would be requested remote_app_name 
in
+    // duplication_add_request; otherwise, once the duplication has existed, 
this
+    // would be the remote app name with which the duplication has been 
created.
     //
     // This field could also be used to check if the meta server supports
     // remote_app_name(i.e. the version of meta server must be >= v2.6.0).
     5:optional string remote_app_name;
+
+    // Since v2.6.0.
+    //
+    // If new duplication is created, this would be requested 
remote_replica_count in
+    // duplication_add_request; otherwise, once the duplication has existed, 
this would
+    // be the remote replica count with which the duplication has been created.
+    //
+    // This field could also be used to check if the meta server supports
+    // remote_replica_count(i.e. the version of meta server must be >= v2.6.0).
+    6:optional i32 remote_replica_count;
 }
 
 // This request is sent from client to meta.
@@ -129,6 +145,11 @@ struct duplication_entry
     // For versions >= v2.6.0, this could be specified by client.
     // For versions < v2.6.0, this must be the same with source app_name.
     8:optional string remote_app_name;
+
+    // Since v2.6.0.
+    // For versions >= v2.6.0, this could be specified by client.
+    // For versions < v2.6.0, this must be the same with source replica_count.
+    9:optional i32 remote_replica_count;
 }
 
 // This request is sent from client to meta.
diff --git a/src/client/replication_ddl_client.cpp 
b/src/client/replication_ddl_client.cpp
index 3e8ba5979..a71241362 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -1362,7 +1362,8 @@ error_with<duplication_add_response>
 replication_ddl_client::add_dup(const std::string &app_name,
                                 const std::string &remote_cluster_name,
                                 bool is_duplicating_checkpoint,
-                                const std::string &remote_app_name)
+                                const std::string &remote_app_name,
+                                const uint32_t remote_replica_count)
 {
     RETURN_EW_NOT_OK_MSG(validate_app_name(remote_app_name, false),
                          duplication_add_response,
@@ -1374,6 +1375,7 @@ replication_ddl_client::add_dup(const std::string 
&app_name,
     req->remote_cluster_name = remote_cluster_name;
     req->is_duplicating_checkpoint = is_duplicating_checkpoint;
     req->__set_remote_app_name(remote_app_name);
+    
req->__set_remote_replica_count(static_cast<int32_t>(remote_replica_count));
     return call_rpc_sync(duplication_add_rpc(std::move(req), 
RPC_CM_ADD_DUPLICATION));
 }
 
diff --git a/src/client/replication_ddl_client.h 
b/src/client/replication_ddl_client.h
index 7907aa79a..69b3ae571 100644
--- a/src/client/replication_ddl_client.h
+++ b/src/client/replication_ddl_client.h
@@ -144,7 +144,8 @@ public:
     error_with<duplication_add_response> add_dup(const std::string &app_name,
                                                  const std::string 
&remote_address,
                                                  bool 
is_duplicating_checkpoint,
-                                                 const std::string 
&remote_app_name);
+                                                 const std::string 
&remote_app_name,
+                                                 const uint32_t 
remote_replica_count);
 
     error_with<duplication_modify_response>
     change_dup_status(const std::string &app_name, int dupid, 
duplication_status::type status);
diff --git a/src/common/duplication_common.cpp 
b/src/common/duplication_common.cpp
index 906b5eca5..6c6e1ad03 100644
--- a/src/common/duplication_common.cpp
+++ b/src/common/duplication_common.cpp
@@ -159,6 +159,11 @@ static nlohmann::json duplication_entry_to_json(const 
duplication_entry &ent)
         json["remote_app_name"] = ent.remote_app_name;
     }
 
+    if (ent.__isset.remote_replica_count) {
+        // remote_replica_count is supported since v2.6.0, thus it won't be 
shown before v2.6.0.
+        json["remote_replica_count"] = ent.remote_replica_count;
+    }
+
     return json;
 }
 
diff --git a/src/meta/duplication/duplication_info.cpp 
b/src/meta/duplication/duplication_info.cpp
index 85a4413d1..4036cf293 100644
--- a/src/meta/duplication/duplication_info.cpp
+++ b/src/meta/duplication/duplication_info.cpp
@@ -190,6 +190,7 @@ blob duplication_info::to_json_blob() const
     copy.status = _next_status;
     copy.fail_mode = _next_fail_mode;
     copy.remote_app_name = remote_app_name;
+    copy.remote_replica_count = remote_replica_count;
     return json::json_forwarder<json_helper>::encode(copy);
 }
 
@@ -206,6 +207,7 @@ duplication_info_s_ptr 
duplication_info::decode_from_blob(dupid_t dup_id,
                                                           int32_t app_id,
                                                           const std::string 
&app_name,
                                                           int32_t 
partition_count,
+                                                          int32_t 
replica_count,
                                                           const std::string 
&store_path,
                                                           const blob &json)
 {
@@ -220,6 +222,12 @@ duplication_info_s_ptr 
duplication_info::decode_from_blob(dupid_t dup_id,
         info.remote_app_name = app_name;
     }
 
+    if (info.remote_replica_count == 0) {
+        // remote_replica_count is missing, which means meta data in remote 
storage(zk) is
+        // still of old version(< v2.6.0).
+        info.remote_replica_count = replica_count;
+    }
+
     std::vector<host_port> meta_list;
     if (!dsn::replication::replica_helper::load_servers_from_config(
             duplication_constants::kClustersSectionName, info.remote, 
meta_list)) {
@@ -230,6 +238,7 @@ duplication_info_s_ptr 
duplication_info::decode_from_blob(dupid_t dup_id,
                                                   app_id,
                                                   app_name,
                                                   partition_count,
+                                                  info.remote_replica_count,
                                                   info.create_timestamp_ms,
                                                   info.remote,
                                                   info.remote_app_name,
diff --git a/src/meta/duplication/duplication_info.h 
b/src/meta/duplication/duplication_info.h
index e010eb4a7..feb750308 100644
--- a/src/meta/duplication/duplication_info.h
+++ b/src/meta/duplication/duplication_info.h
@@ -56,6 +56,7 @@ public:
                      int32_t app_id,
                      const std::string &app_name,
                      int32_t partition_count,
+                     int32_t remote_replica_count,
                      uint64_t create_now_ms,
                      const std::string &remote_cluster_name,
                      const std::string &remote_app_name,
@@ -65,6 +66,7 @@ public:
           app_id(app_id),
           app_name(app_name),
           partition_count(partition_count),
+          remote_replica_count(remote_replica_count),
           remote_cluster_name(remote_cluster_name),
           remote_app_name(remote_app_name),
           remote_cluster_metas(std::move(remote_cluster_metas)),
@@ -139,6 +141,7 @@ public:
                                                    int32_t app_id,
                                                    const std::string &app_name,
                                                    int32_t partition_count,
+                                                   int32_t replica_count,
                                                    const std::string 
&store_path,
                                                    const blob &json);
 
@@ -156,6 +159,7 @@ public:
         entry.status = _status;
         entry.__set_fail_mode(_fail_mode);
         entry.__set_remote_app_name(remote_app_name);
+        entry.__set_remote_replica_count(remote_replica_count);
         entry.__isset.progress = true;
         for (const auto &kv : _progress) {
             if (!kv.second.is_inited) {
@@ -240,11 +244,14 @@ private:
         int64_t create_timestamp_ms;
         duplication_fail_mode::type fail_mode;
         std::string remote_app_name;
-
-        // Since there is no remote_cluster_name for old versions(< v2.6.0), 
remote_app_name is
-        // optional. Following deserialization functions could be compatible 
with the situations
-        // where remote_app_name is missing.
-        DEFINE_JSON_SERIALIZATION(remote, status, create_timestamp_ms, 
fail_mode, remote_app_name);
+        int32_t remote_replica_count{0};
+
+        // Since there is no remote_cluster_name for old versions(< v2.6.0), 
remote_app_name
+        // and remote_replica_count are optional. Following deserialization 
functions could
+        // be compatible with the situations where remote_app_name and 
remote_replica_count
+        // are missing.
+        DEFINE_JSON_SERIALIZATION(
+            remote, status, create_timestamp_ms, fail_mode, remote_app_name, 
remote_replica_count);
     };
 
 public:
@@ -252,6 +259,7 @@ public:
     const int32_t app_id{0};
     const std::string app_name;
     const int32_t partition_count{0};
+    const int32_t remote_replica_count{0};
 
     const std::string remote_cluster_name;
     const std::string remote_app_name;
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index 14602ee7d..2673f5055 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -193,11 +193,15 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
         remote_app_name = request.app_name;
     }
 
+    int32_t remote_replica_count =
+        request.__isset.remote_replica_count ? request.remote_replica_count : 
0;
+
     LOG_INFO("add duplication for app({}), remote cluster name is {}, "
-             "remote app name is {}",
+             "remote app name is {}, remote replica count is {}",
              request.app_name,
              request.remote_cluster_name,
-             remote_app_name);
+             remote_app_name,
+             remote_replica_count);
 
     LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(request.remote_cluster_name !=
                                                get_current_cluster_name(),
@@ -223,6 +227,13 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
         request.remote_cluster_name,
         duplication_constants::kClustersSectionName);
 
+    LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
+        remote_replica_count >= 0,
+        response,
+        ERR_INVALID_PARAMETERS,
+        "invalid remote_replica_count({}) which should never be negative",
+        remote_replica_count);
+
     std::shared_ptr<app_state> app;
     duplication_info_s_ptr dup;
     {
@@ -249,9 +260,16 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
             }
         }
 
+        if (remote_replica_count == 0) {
+            // 0 means that the replica count of the remote app would be the 
same as the
+            // source app.
+            remote_replica_count = app->max_replica_count;
+        }
+
         if (dup) {
             // The duplication for the same app to the same remote cluster has 
existed.
             remote_app_name = dup->remote_app_name;
+            remote_replica_count = dup->remote_replica_count;
             LOG_INFO("no need to add duplication, since it has existed: 
app_name={}, "
                      "remote_cluster_name={}, remote_app_name={}",
                      request.app_name,
@@ -283,18 +301,22 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
     }
 
     if (!dup) {
-        dup = new_dup_from_init(
-            request.remote_cluster_name, remote_app_name, 
std::move(meta_list), app);
+        dup = new_dup_from_init(request.remote_cluster_name,
+                                remote_app_name,
+                                remote_replica_count,
+                                std::move(meta_list),
+                                app);
     }
 
-    do_add_duplication(app, dup, rpc, remote_app_name);
+    do_add_duplication(app, dup, rpc, remote_app_name, remote_replica_count);
 }
 
 // ThreadPool(WRITE): THREAD_POOL_META_STATE
 void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> 
&app,
                                                   duplication_info_s_ptr &dup,
                                                   duplication_add_rpc &rpc,
-                                                  const std::string 
&remote_app_name)
+                                                  const std::string 
&remote_app_name,
+                                                  const int32_t 
remote_replica_count)
 {
     const auto &ec = dup->start(rpc.request().is_duplicating_checkpoint);
     LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(ec == ERR_OK,
@@ -308,7 +330,9 @@ void 
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
     auto value = dup->to_json_blob();
     std::queue<std::string> nodes({get_duplication_path(*app), 
std::to_string(dup->id)});
     _meta_svc->get_meta_storage()->create_node_recursively(
-        std::move(nodes), std::move(value), [app, this, dup, rpc, 
remote_app_name]() mutable {
+        std::move(nodes),
+        std::move(value),
+        [app, this, dup, rpc, remote_app_name, remote_replica_count]() mutable 
{
             LOG_INFO("[{}] add duplication successfully [app_name: {}, 
follower: {}]",
                      dup->log_prefix(),
                      app->app_name,
@@ -322,6 +346,7 @@ void 
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
             resp.appid = app->app_id;
             resp.dupid = dup->id;
             resp.__set_remote_app_name(remote_app_name);
+            resp.__set_remote_replica_count(remote_replica_count);
 
             zauto_write_lock l(app_lock());
             refresh_duplicating_no_lock(app);
@@ -434,7 +459,7 @@ void 
meta_duplication_service::create_follower_app_for_duplication(
     request.app_name = dup->remote_app_name;
     request.options.app_type = app->app_type;
     request.options.partition_count = app->partition_count;
-    request.options.replica_count = app->max_replica_count;
+    request.options.replica_count = dup->remote_replica_count;
     request.options.success_if_exist = false;
     request.options.envs = app->envs;
     request.options.is_stateful = app->is_stateful;
@@ -618,6 +643,7 @@ void 
meta_duplication_service::do_update_partition_confirmed(
 std::shared_ptr<duplication_info>
 meta_duplication_service::new_dup_from_init(const std::string 
&remote_cluster_name,
                                             const std::string &remote_app_name,
+                                            const int32_t remote_replica_count,
                                             std::vector<host_port> 
&&remote_cluster_metas,
                                             std::shared_ptr<app_state> &app) 
const
 {
@@ -637,6 +663,7 @@ meta_duplication_service::new_dup_from_init(const 
std::string &remote_cluster_na
                                                  app->app_id,
                                                  app->app_name,
                                                  app->partition_count,
+                                                 remote_replica_count,
                                                  dsn_now_ms(),
                                                  remote_cluster_name,
                                                  remote_app_name,
@@ -741,8 +768,13 @@ void 
meta_duplication_service::do_restore_duplication(dupid_t dup_id,
         [ dup_id, this, app = std::move(app), store_path ](const blob &json) {
             zauto_write_lock l(app_lock());
 
-            auto dup = duplication_info::decode_from_blob(
-                dup_id, app->app_id, app->app_name, app->partition_count, 
store_path, json);
+            auto dup = duplication_info::decode_from_blob(dup_id,
+                                                          app->app_id,
+                                                          app->app_name,
+                                                          app->partition_count,
+                                                          
app->max_replica_count,
+                                                          store_path,
+                                                          json);
             if (nullptr == dup) {
                 LOG_ERROR("failed to decode json \"{}\" on path {}", json, 
store_path);
                 return; // fail fast
diff --git a/src/meta/duplication/meta_duplication_service.h 
b/src/meta/duplication/meta_duplication_service.h
index adb1df7ea..2bcda880e 100644
--- a/src/meta/duplication/meta_duplication_service.h
+++ b/src/meta/duplication/meta_duplication_service.h
@@ -81,7 +81,8 @@ private:
     void do_add_duplication(std::shared_ptr<app_state> &app,
                             duplication_info_s_ptr &dup,
                             duplication_add_rpc &rpc,
-                            const std::string &remote_app_name);
+                            const std::string &remote_app_name,
+                            const int32_t remote_replica_count);
 
     void do_modify_duplication(std::shared_ptr<app_state> &app,
                                duplication_info_s_ptr &dup,
@@ -124,6 +125,7 @@ private:
     std::shared_ptr<duplication_info>
     new_dup_from_init(const std::string &remote_cluster_name,
                       const std::string &remote_app_name,
+                      const int32_t remote_replica_count,
                       std::vector<host_port> &&remote_cluster_metas,
                       std::shared_ptr<app_state> &app) const;
 
diff --git a/src/meta/test/duplication_info_test.cpp 
b/src/meta/test/duplication_info_test.cpp
index 05f0dbcdd..5cfa4e12b 100644
--- a/src/meta/test/duplication_info_test.cpp
+++ b/src/meta/test/duplication_info_test.cpp
@@ -41,6 +41,7 @@ public:
     static const std::string kTestRemoteClusterName;
     static const std::string kTestRemoteAppName;
     static const std::string kTestMetaStorePath;
+    static const int32_t kTestRemoteReplicaCount;
 
     void force_update_status(duplication_info &dup, duplication_status::type 
status)
     {
@@ -54,6 +55,7 @@ public:
                              1,
                              kTestAppName,
                              2,
+                             kTestRemoteReplicaCount,
                              0,
                              kTestRemoteClusterName,
                              kTestRemoteAppName,
@@ -108,6 +110,7 @@ public:
                              1,
                              kTestAppName,
                              4,
+                             kTestRemoteReplicaCount,
                              0,
                              kTestRemoteClusterName,
                              kTestRemoteAppName,
@@ -120,6 +123,7 @@ public:
         auto dup_ent = dup.to_duplication_entry();
         ASSERT_EQ(0, dup_ent.progress.size());
         ASSERT_EQ(kTestRemoteAppName, dup_ent.remote_app_name);
+        ASSERT_EQ(kTestRemoteReplicaCount, dup_ent.remote_replica_count);
 
         for (int i = 0; i < 4; i++) {
             dup.init_progress(i, invalid_decree);
@@ -140,6 +144,7 @@ public:
                              1,
                              kTestAppName,
                              4,
+                             kTestRemoteReplicaCount,
                              0,
                              kTestRemoteClusterName,
                              kTestRemoteAppName,
@@ -160,6 +165,7 @@ public:
                              1,
                              kTestAppName,
                              4,
+                             kTestRemoteReplicaCount,
                              0,
                              kTestRemoteClusterName,
                              kTestRemoteAppName,
@@ -169,7 +175,7 @@ public:
         dup.persist_status();
 
         dup.alter_status(duplication_status::DS_APP);
-        auto json = dup.to_json_blob();
+        const auto &json = dup.to_json_blob();
         dup.persist_status();
 
         duplication_info::json_helper copy;
@@ -178,9 +184,12 @@ public:
         ASSERT_EQ(dup.create_timestamp_ms, copy.create_timestamp_ms);
         ASSERT_EQ(dup.remote_cluster_name, copy.remote);
         ASSERT_EQ(dup.remote_app_name, copy.remote_app_name);
+        ASSERT_EQ(kTestRemoteAppName, copy.remote_app_name);
+        ASSERT_EQ(dup.remote_replica_count, copy.remote_replica_count);
+        ASSERT_EQ(kTestRemoteReplicaCount, copy.remote_replica_count);
 
-        auto dup_sptr =
-            duplication_info::decode_from_blob(1, 1, kTestAppName, 4, 
kTestMetaStorePath, json);
+        auto dup_sptr = duplication_info::decode_from_blob(
+            1, 1, kTestAppName, 4, kTestRemoteReplicaCount, 
kTestMetaStorePath, json);
         ASSERT_TRUE(dup_sptr->equals_to(dup)) << *dup_sptr << " " << dup;
 
         blob new_json =
@@ -188,12 +197,31 @@ public:
         
ASSERT_FALSE(json::json_forwarder<duplication_info::json_helper>::decode(new_json,
 copy));
         ASSERT_EQ(duplication_status::DS_REMOVED, copy.status);
     }
+
+    static void test_encode_and_decode_default()
+    {
+        dsn_run_config("config-test.ini", false);
+
+        duplication_info::json_helper copy;
+        copy.status = duplication_status::DS_INIT;
+        copy.fail_mode = duplication_fail_mode::FAIL_SLOW;
+        copy.remote = kTestRemoteClusterName;
+        ASSERT_TRUE(copy.remote_app_name.empty());
+        ASSERT_EQ(0, copy.remote_replica_count);
+
+        const auto json = 
json::json_forwarder<duplication_info::json_helper>::encode(copy);
+        auto dup = duplication_info::decode_from_blob(
+            1, 1, kTestAppName, 4, kTestRemoteReplicaCount, 
kTestMetaStorePath, json);
+        ASSERT_EQ(kTestAppName, dup->remote_app_name);
+        ASSERT_EQ(kTestRemoteReplicaCount, dup->remote_replica_count);
+    }
 };
 
 const std::string duplication_info_test::kTestAppName = "temp";
 const std::string duplication_info_test::kTestRemoteClusterName = 
"slave-cluster";
 const std::string duplication_info_test::kTestRemoteAppName = "remote_temp";
 const std::string duplication_info_test::kTestMetaStorePath = 
"/meta_test/101/duplication/1";
+const int32_t duplication_info_test::kTestRemoteReplicaCount = 3;
 
 TEST_F(duplication_info_test, alter_status_when_busy)
 {
@@ -201,6 +229,7 @@ TEST_F(duplication_info_test, alter_status_when_busy)
                          1,
                          kTestAppName,
                          4,
+                         kTestRemoteReplicaCount,
                          0,
                          kTestRemoteClusterName,
                          kTestRemoteAppName,
@@ -274,6 +303,7 @@ TEST_F(duplication_info_test, alter_status)
                              1,
                              kTestAppName,
                              4,
+                             kTestRemoteReplicaCount,
                              0,
                              kTestRemoteClusterName,
                              kTestRemoteAppName,
@@ -299,12 +329,15 @@ TEST_F(duplication_info_test, init_and_start) { 
test_init_and_start(); }
 
 TEST_F(duplication_info_test, encode_and_decode) { test_encode_and_decode(); }
 
+TEST_F(duplication_info_test, encode_and_decode_default) { 
test_encode_and_decode_default(); }
+
 TEST_F(duplication_info_test, is_valid)
 {
     duplication_info dup(1,
                          1,
                          kTestAppName,
                          4,
+                         kTestRemoteReplicaCount,
                          0,
                          kTestRemoteClusterName,
                          kTestRemoteAppName,
diff --git a/src/meta/test/meta_duplication_service_test.cpp 
b/src/meta/test/meta_duplication_service_test.cpp
index 5f7600312..78c0de661 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -73,17 +73,23 @@ public:
     static const std::string kTestAppName;
     static const std::string kTestRemoteClusterName;
     static const std::string kTestRemoteAppName;
+    static const int32_t kTestRemoteReplicaCount;
 
     meta_duplication_service_test() {}
 
     duplication_add_response create_dup(const std::string &app_name,
                                         const std::string &remote_cluster,
-                                        const std::string &remote_app_name)
+                                        const bool specified,
+                                        const std::string &remote_app_name,
+                                        const int32_t remote_replica_count)
     {
         auto req = std::make_unique<duplication_add_request>();
         req->app_name = app_name;
         req->remote_cluster_name = remote_cluster;
-        req->__set_remote_app_name(remote_app_name);
+        if (specified) {
+            req->__set_remote_app_name(remote_app_name);
+            req->__set_remote_replica_count(remote_replica_count);
+        }
 
         duplication_add_rpc rpc(std::move(req), RPC_CM_ADD_DUPLICATION);
         dup_svc().add_duplication(rpc);
@@ -92,14 +98,29 @@ public:
     }
 
     duplication_add_response create_dup(const std::string &app_name,
-                                        const std::string &remote_cluster)
+                                        const std::string &remote_cluster,
+                                        const std::string &remote_app_name,
+                                        const int32_t remote_replica_count)
+    {
+        return create_dup(app_name, remote_cluster, true, remote_app_name, 
remote_replica_count);
+    }
+
+    duplication_add_response create_dup_unspecified(const std::string 
&app_name,
+                                                    const std::string 
&remote_cluster)
     {
-        return create_dup(app_name, remote_cluster, app_name);
+        return create_dup(app_name, remote_cluster, false, "", 0);
+    }
+
+    duplication_add_response create_dup(const std::string &app_name,
+                                        const std::string &remote_cluster,
+                                        const int32_t remote_replica_count)
+    {
+        return create_dup(app_name, remote_cluster, app_name, 
remote_replica_count);
     }
 
     duplication_add_response create_dup(const std::string &app_name)
     {
-        return create_dup(app_name, kTestRemoteClusterName);
+        return create_dup(app_name, kTestRemoteClusterName, 
kTestRemoteReplicaCount);
     }
 
     duplication_query_response query_dup_info(const std::string &app_name)
@@ -195,17 +216,17 @@ public:
 
         int last_dup = 0;
         for (int i = 0; i < 1000; i++) {
-            auto dup =
-                dup_svc().new_dup_from_init(kTestRemoteClusterName, 
kTestRemoteAppName, {}, app);
+            auto dup = dup_svc().new_dup_from_init(
+                kTestRemoteClusterName, kTestRemoteAppName, 
kTestRemoteReplicaCount, {}, app);
 
             ASSERT_GT(dup->id, 0);
             ASSERT_FALSE(dup->is_altering());
-            ASSERT_EQ(dup->_status, duplication_status::DS_INIT);
-            ASSERT_EQ(dup->_next_status, duplication_status::DS_INIT);
+            ASSERT_EQ(duplication_status::DS_INIT, dup->_status);
+            ASSERT_EQ(duplication_status::DS_INIT, dup->_next_status);
 
             auto ent = dup->to_duplication_entry();
             for (int j = 0; j < app->partition_count; j++) {
-                ASSERT_EQ(ent.progress[j], invalid_decree);
+                ASSERT_EQ(invalid_decree, ent.progress[j]);
             }
 
             if (last_dup != 0) {
@@ -327,7 +348,7 @@ public:
         SetUp();
         create_app(kTestAppName);
         app = find_app(kTestAppName);
-        auto test_dup = create_dup(kTestAppName, kTestRemoteClusterName);
+        auto test_dup = create_dup(kTestAppName, kTestRemoteClusterName, 
kTestRemoteReplicaCount);
         ASSERT_EQ(test_dup.err, ERR_OK);
         duplication_info_s_ptr dup = app->duplications[test_dup.dupid];
         
_ms->get_meta_storage()->create_node(meta_duplication_service::get_partition_path(dup,
 "0"),
@@ -348,7 +369,7 @@ public:
         SetUp();
         create_app(kTestAppName);
         app = find_app(kTestAppName);
-        test_dup = create_dup(kTestAppName, kTestRemoteClusterName);
+        test_dup = create_dup(kTestAppName, kTestRemoteClusterName, 
kTestRemoteReplicaCount);
         ASSERT_EQ(test_dup.err, ERR_OK);
         dup = app->duplications[test_dup.dupid];
         
_ms->get_meta_storage()->create_node(meta_duplication_service::get_partition_path(dup,
 "x"),
@@ -368,47 +389,100 @@ public:
     {
         static const std::string kTestSameAppName(kTestAppName + "_same");
         static const std::string kTestAnotherAppName(kTestAppName + 
"_another");
+        static const std::string kTestUnspecifiedAppName(kTestAppName + 
"_unspecified");
 
         create_app(kTestAppName);
         create_app(kTestSameAppName);
         create_app(kTestAnotherAppName);
+        create_app(kTestUnspecifiedAppName);
 
         struct TestData
         {
             std::string app_name;
             std::string remote;
+
+            bool specified;
             std::string remote_app_name;
+            int32_t remote_replica_count;
 
             error_code wec;
         } tests[] = {
             // The general case that duplicating to remote cluster with 
specified remote_app_name.
-            {kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, ERR_OK},
+            {kTestAppName,
+             kTestRemoteClusterName,
+             true,
+             kTestRemoteAppName,
+             kTestRemoteReplicaCount,
+             ERR_OK},
             // A duplication that has been added would be found with its 
original remote_app_name.
-            {kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, ERR_OK},
+            {kTestAppName,
+             kTestRemoteClusterName,
+             true,
+             kTestRemoteAppName,
+             kTestRemoteReplicaCount,
+             ERR_OK},
             // The general case that duplicating to remote cluster with same 
remote_app_name.
-            {kTestSameAppName, kTestRemoteClusterName, kTestSameAppName, 
ERR_OK},
+            {kTestSameAppName,
+             kTestRemoteClusterName,
+             true,
+             kTestSameAppName,
+             kTestRemoteReplicaCount,
+             ERR_OK},
             // It is not allowed that remote_cluster_name does not exist in 
"duplication-group".
-            {kTestAppName, "test-invalid-remote", kTestRemoteAppName, 
ERR_INVALID_PARAMETERS},
+            {kTestAppName,
+             "test-invalid-remote",
+             true,
+             kTestRemoteAppName,
+             kTestRemoteReplicaCount,
+             ERR_INVALID_PARAMETERS},
             // Duplicating to local cluster is not allowed.
-            {kTestAppName, get_current_cluster_name(), kTestRemoteAppName, 
ERR_INVALID_PARAMETERS},
+            {kTestAppName,
+             get_current_cluster_name(),
+             true,
+             kTestRemoteAppName,
+             kTestRemoteReplicaCount,
+             ERR_INVALID_PARAMETERS},
             // It is not allowed that remote_cluster_name exists in 
"duplication-group" but not
             // exists in "pegasus.clusters".
             {kTestAppName,
              "cluster_without_address_for_test",
+             true,
              kTestRemoteAppName,
+             kTestRemoteReplicaCount,
              ERR_INVALID_PARAMETERS},
             // The attempt that duplicates another app to the same remote app 
would be blocked.
             {kTestAnotherAppName,
              kTestRemoteClusterName,
+             true,
              kTestRemoteAppName,
+             kTestRemoteReplicaCount,
              ERR_INVALID_PARAMETERS},
             // The attempt that duplicates another app to the different remote 
app would be
             // ok.
-            {kTestAnotherAppName, kTestRemoteClusterName, kTestAppName, 
ERR_OK},
+            {kTestAnotherAppName,
+             kTestRemoteClusterName,
+             true,
+             kTestAppName,
+             kTestRemoteReplicaCount,
+             ERR_OK},
+            // Add a duplication without specifying remote_app_name and 
remote_replica_count.
+            {kTestUnspecifiedAppName,
+             kTestRemoteClusterName,
+             false,
+             kTestUnspecifiedAppName,
+             3,
+             ERR_OK},
         };
 
         for (auto test : tests) {
-            auto resp = create_dup(test.app_name, test.remote, 
test.remote_app_name);
+            duplication_add_response resp;
+            if (test.specified) {
+                resp = create_dup(
+                    test.app_name, test.remote, test.remote_app_name, 
test.remote_replica_count);
+            } else {
+                resp = create_dup_unspecified(test.app_name, test.remote);
+            }
+
             ASSERT_EQ(test.wec, resp.err);
 
             if (test.wec != ERR_OK) {
@@ -423,6 +497,8 @@ public:
             ASSERT_EQ(test.remote, dup->remote_cluster_name);
             ASSERT_EQ(test.remote_app_name, resp.remote_app_name);
             ASSERT_EQ(test.remote_app_name, dup->remote_app_name);
+            ASSERT_EQ(test.remote_replica_count, resp.remote_replica_count);
+            ASSERT_EQ(test.remote_replica_count, dup->remote_replica_count);
             ASSERT_EQ(resp.dupid, dup->id);
             ASSERT_TRUE(app->duplicating);
         }
@@ -432,6 +508,7 @@ public:
 const std::string meta_duplication_service_test::kTestAppName = "test_app";
 const std::string meta_duplication_service_test::kTestRemoteClusterName = 
"slave-cluster";
 const std::string meta_duplication_service_test::kTestRemoteAppName = 
"remote_test_app";
+const int32_t meta_duplication_service_test::kTestRemoteReplicaCount = 3;
 
 // This test ensures that duplication upon an unavailable app will
 // be rejected with ERR_APP_NOT_EXIST.
@@ -494,14 +571,16 @@ TEST_F(meta_duplication_service_test, 
dont_create_if_existed)
     }
 }
 
-TEST_F(meta_duplication_service_test, add_dup_with_remote_app_name)
+TEST_F(meta_duplication_service_test, add_dup_with_params)
 {
     create_app(kTestAppName);
     auto app = find_app(kTestAppName);
     ASSERT_EQ(kTestAppName, app->app_name);
 
-    create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName);
-    const dupid_t dupid = create_dup(kTestAppName).dupid;
+    const dupid_t dupid =
+        create_dup(
+            kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, 
kTestRemoteReplicaCount)
+            .dupid;
 
     const auto &resp = query_dup_info(kTestAppName);
     ASSERT_EQ(ERR_OK, resp.err);
@@ -511,6 +590,7 @@ TEST_F(meta_duplication_service_test, 
add_dup_with_remote_app_name)
     ASSERT_EQ(dupid, duplication_entry.dupid);
     ASSERT_EQ(duplication_status::DS_PREPARE, duplication_entry.status);
     ASSERT_EQ(kTestRemoteAppName, duplication_entry.remote_app_name);
+    ASSERT_EQ(kTestRemoteReplicaCount, duplication_entry.remote_replica_count);
 }
 
 TEST_F(meta_duplication_service_test, change_duplication_status)
@@ -754,7 +834,7 @@ TEST_F(meta_duplication_service_test, 
recover_from_corrupted_meta_data)
 TEST_F(meta_duplication_service_test, query_duplication_handler)
 {
     create_app(kTestAppName);
-    create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName);
+    create_dup(kTestAppName, kTestRemoteClusterName, kTestRemoteAppName, 
kTestRemoteReplicaCount);
     meta_http_service mhs(_ms.get());
 
     http_request fake_req;
@@ -776,7 +856,8 @@ TEST_F(meta_duplication_service_test, 
query_duplication_handler)
     ASSERT_EQ(std::string() + R"({"1":{"create_ts":")" + ts_buf + 
R"(","dupid":)" +
                   std::to_string(dup->id) +
                   R"(,"fail_mode":"FAIL_SLOW","remote":"slave-cluster")"
-                  
R"(,"remote_app_name":"remote_test_app","status":"DS_PREPARE"},"appid":2})",
+                  
R"(,"remote_app_name":"remote_test_app","remote_replica_count":3)"
+                  R"(,"status":"DS_PREPARE"},"appid":2})",
               fake_resp.body);
 }
 
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index b0ad80515..70fc62119 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -925,9 +925,12 @@ private:
 
 // A helper macro to parse an optional command argument, the result is filled 
in an uint32_t
 // variable 'value'.
-#define PARSE_OPT_UINT(name, value, def_val)                                   
                    \
+//
+// Variable arguments are `name` or `init_list` of argh::parser::operator(). 
See argh::parser
+// for details.
+#define PARSE_OPT_UINT(value, def_val, ...)                                    
                    \
     do {                                                                       
                    \
-        const auto param = cmd(name, (def_val)).str();                         
                    \
+        const auto param = cmd(__VA_ARGS__, (def_val)).str();                  
                    \
         if (!::dsn::buf2uint32(param, value)) {                                
                    \
             fmt::print(stderr, "invalid command, '{}' should be an unsigned 
integer\n", param);    \
             return false;                                                      
                    \
diff --git a/src/shell/commands/duplication.cpp 
b/src/shell/commands/duplication.cpp
index f974ad078..097c1ec73 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -33,13 +33,13 @@
 #include "duplication_types.h"
 #include "shell/argh.h"
 #include "shell/command_executor.h"
+#include "shell/command_helper.h"
 #include "shell/commands.h"
 #include "shell/sds/sds.h"
 #include "utils/error_code.h"
 #include "utils/errors.h"
 #include "utils/fmt_logging.h"
 #include "utils/output_utils.h"
-#include "utils/ports.h"
 #include "utils/string_conv.h"
 #include "utils/time_utils.h"
 
@@ -49,20 +49,14 @@ using dsn::replication::duplication_status;
 bool add_dup(command_executor *e, shell_context *sc, arguments args)
 {
     // add_dup <app_name> <remote_cluster_name> [-s|--sst] 
[-a|--remote_app_name str]
+    // [-r|--remote_replica_count num]
 
     argh::parser cmd(args.argc, args.argv);
-    if (cmd.pos_args().size() > 4) {
+    if (cmd.pos_args().size() > 5) {
         fmt::print(stderr, "too many params\n");
         return false;
     }
 
-    for (const auto &flag : cmd.flags()) {
-        if (dsn_unlikely(flag != "s" && flag != "sst")) {
-            fmt::print(stderr, "unknown flag {}\n", flag);
-            return false;
-        }
-    }
-
     if (!cmd(1)) {
         fmt::print(stderr, "missing param <app_name>\n");
         return false;
@@ -82,14 +76,22 @@ bool add_dup(command_executor *e, shell_context *sc, 
arguments args)
     }
 
     // Check if the boolean option is specified.
-    bool is_duplicating_checkpoint = cmd[{"-s", "--sst"}];
+    const auto is_duplicating_checkpoint = cmd[{"-s", "--sst"}];
 
     // Read the app name of the remote cluster, if any.
     // Otherwise, use app_name as the remote_app_name.
-    std::string remote_app_name(cmd({"-a", "--remote_app_name"}, 
app_name).str());
-
-    auto err_resp = sc->ddl_client->add_dup(
-        app_name, remote_cluster_name, is_duplicating_checkpoint, 
remote_app_name);
+    const std::string remote_app_name(cmd({"-a", "--remote_app_name"}, 
app_name).str());
+
+    // 0 represents that remote_replica_count is missing, which means the 
replica count of
+    // the remote app would be the same as the source app.
+    uint32_t remote_replica_count = 0;
+    PARSE_OPT_UINT(remote_replica_count, 0, {"-r", "--remote_replica_count"});
+
+    auto err_resp = sc->ddl_client->add_dup(app_name,
+                                            remote_cluster_name,
+                                            is_duplicating_checkpoint,
+                                            remote_app_name,
+                                            remote_replica_count);
     auto err = err_resp.get_error();
     std::string hint;
     if (err) {
diff --git a/src/shell/commands/local_partition_split.cpp 
b/src/shell/commands/local_partition_split.cpp
index 7634a4f7c..6cc4726fa 100644
--- a/src/shell/commands/local_partition_split.cpp
+++ b/src/shell/commands/local_partition_split.cpp
@@ -697,8 +697,8 @@ bool local_partition_split(command_executor *e, 
shell_context *sc, arguments arg
     PARSE_UINT(lpsc.src_partition_count);
     PARSE_UINT(lpsc.dst_partition_count);
     lpsc.dst_app_name = cmd(param_index++).str();
-    PARSE_OPT_UINT("threads_per_data_dir", lpsc.threads_per_data_dir, 1);
-    PARSE_OPT_UINT("threads_per_partition", lpsc.threads_per_partition, 1);
+    PARSE_OPT_UINT(lpsc.threads_per_data_dir, 1, "threads_per_data_dir");
+    PARSE_OPT_UINT(lpsc.threads_per_partition, 1, "threads_per_partition");
     lpsc.post_full_compact = cmd["--post_full_compact"];
     lpsc.post_count = cmd["--post_count"];
 
diff --git a/src/shell/main.cpp b/src/shell/main.cpp
index 4932a58fe..3b279824c 100644
--- a/src/shell/main.cpp
+++ b/src/shell/main.cpp
@@ -475,7 +475,8 @@ static command_executor commands[] = {
     },
     {"add_dup",
      "add duplication",
-     "<app_name> <remote_cluster_name> [-s|--sst] [-a|--remote_app_name str]",
+     "<app_name> <remote_cluster_name> [-s|--sst] [-a|--remote_app_name str] "
+     "[-r|--remote_replica_count num]",
      add_dup},
     {"query_dup", "query duplication info", "<app_name> [-d|--detail]", 
query_dup},
     {"remove_dup", "remove duplication", "<app_name> <dup_id>", remove_dup},


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to