This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c96b5ade fix(duplication): add warning message while trying to add a 
duplication that has been existing for the same table with the same remote 
cluster (#2038)
6c96b5ade is described below

commit 6c96b5ade8e635b07cfde5c80661247c1e9f5622
Author: Dan Wang <[email protected]>
AuthorDate: Tue Jun 4 11:53:05 2024 +0800

    fix(duplication): add warning message while trying to add a duplication 
that has been existing for the same table with the same remote cluster (#2038)
    
    https://github.com/apache/incubator-pegasus/issues/2039
---
 src/meta/duplication/meta_duplication_service.cpp | 26 +++++++++----------
 src/meta/duplication/meta_duplication_service.h   |  4 +--
 src/meta/test/meta_duplication_service_test.cpp   | 31 +++++++++++++----------
 src/shell/command_helper.h                        | 11 +++++---
 src/shell/commands/duplication.cpp                | 25 +++++++++++-------
 src/utils/error_code.h                            |  3 +++
 6 files changed, 59 insertions(+), 41 deletions(-)

diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index 4c432736c..77642c1f5 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -241,6 +241,7 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
 
     std::shared_ptr<app_state> app;
     duplication_info_s_ptr dup;
+    error_code resp_err = ERR_OK;
     {
         zauto_read_lock l(app_lock());
 
@@ -273,13 +274,13 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
 
         if (dup) {
             // The duplication for the same app to the same remote cluster has 
existed.
-            remote_app_name = dup->remote_app_name;
-            remote_replica_count = dup->remote_replica_count;
-            LOG_INFO("no need to add duplication, since it has existed: 
app_name={}, "
+            resp_err = ERR_DUP_EXIST;
+            LOG_INFO("[{}] duplication has been existing: app_name={}, "
                      "remote_cluster_name={}, remote_app_name={}",
+                     dup->log_prefix(),
                      request.app_name,
                      request.remote_cluster_name,
-                     remote_app_name);
+                     dup->remote_app_name);
         } else {
             // Check if other apps of this cluster are duplicated to the same 
remote app.
             for (const auto & [ app_name, cur_app_state ] : 
_state->_exist_apps) {
@@ -313,15 +314,14 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
                                 app);
     }
 
-    do_add_duplication(app, dup, rpc, remote_app_name, remote_replica_count);
+    do_add_duplication(app, dup, rpc, resp_err);
 }
 
 // ThreadPool(WRITE): THREAD_POOL_META_STATE
 void meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> 
&app,
                                                   duplication_info_s_ptr &dup,
                                                   duplication_add_rpc &rpc,
-                                                  const std::string 
&remote_app_name,
-                                                  const int32_t 
remote_replica_count)
+                                                  const error_code &resp_err)
 {
     const auto &ec = dup->start(rpc.request().is_duplicating_checkpoint);
     LOG_ERROR_DUP_HINT_AND_RETURN_IF_NOT(ec == ERR_OK,
@@ -335,10 +335,8 @@ void 
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
     auto value = dup->to_json_blob();
     std::queue<std::string> nodes({get_duplication_path(*app), 
std::to_string(dup->id)});
     _meta_svc->get_meta_storage()->create_node_recursively(
-        std::move(nodes),
-        std::move(value),
-        [app, this, dup, rpc, remote_app_name, remote_replica_count]() mutable 
{
-            LOG_INFO("[{}] add duplication successfully [app_name: {}, 
follower: {}]",
+        std::move(nodes), std::move(value), [app, this, dup, rpc, resp_err]() 
mutable {
+            LOG_INFO("[{}] add duplication successfully [app_name: {}, 
remote_cluster_name: {}]",
                      dup->log_prefix(),
                      app->app_name,
                      dup->remote_cluster_name);
@@ -347,11 +345,11 @@ void 
meta_duplication_service::do_add_duplication(std::shared_ptr<app_state> &ap
             dup->persist_status();
 
             auto &resp = rpc.response();
-            resp.err = ERR_OK;
+            resp.err = resp_err;
             resp.appid = app->app_id;
             resp.dupid = dup->id;
-            resp.__set_remote_app_name(remote_app_name);
-            resp.__set_remote_replica_count(remote_replica_count);
+            resp.__set_remote_app_name(dup->remote_app_name);
+            resp.__set_remote_replica_count(dup->remote_replica_count);
 
             zauto_write_lock l(app_lock());
             refresh_duplicating_no_lock(app);
diff --git a/src/meta/duplication/meta_duplication_service.h 
b/src/meta/duplication/meta_duplication_service.h
index 2bcda880e..3f06d6326 100644
--- a/src/meta/duplication/meta_duplication_service.h
+++ b/src/meta/duplication/meta_duplication_service.h
@@ -31,6 +31,7 @@
 #include "utils/fmt_logging.h"
 
 namespace dsn {
+class error_code;
 class host_port;
 class zrwlock_nr;
 
@@ -81,8 +82,7 @@ private:
     void do_add_duplication(std::shared_ptr<app_state> &app,
                             duplication_info_s_ptr &dup,
                             duplication_add_rpc &rpc,
-                            const std::string &remote_app_name,
-                            const int32_t remote_replica_count);
+                            const error_code &resp_err);
 
     void do_modify_duplication(std::shared_ptr<app_state> &app,
                                duplication_info_s_ptr &dup,
diff --git a/src/meta/test/meta_duplication_service_test.cpp 
b/src/meta/test/meta_duplication_service_test.cpp
index 0e0ed1f40..621a34d9c 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -57,6 +57,7 @@
 #include "meta/server_state.h"
 #include "meta/test/misc/misc.h"
 #include "meta_test_base.h"
+#include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_host_port.h"
 #include "utils/blob.h"
@@ -399,7 +400,7 @@ public:
         struct TestData
         {
             std::string app_name;
-            std::string remote;
+            std::string remote_cluster_name;
 
             bool specified;
             std::string remote_app_name;
@@ -414,13 +415,14 @@ public:
              kTestRemoteAppName,
              kTestRemoteReplicaCount,
              ERR_OK},
-            // A duplication that has been added would be found with its 
original remote_app_name.
+            // Add a duplication that has been existing for the same table 
with the same remote
+            // cluster.
             {kTestAppName,
              kTestRemoteClusterName,
-             true,
+             false,
              kTestRemoteAppName,
              kTestRemoteReplicaCount,
-             ERR_OK},
+             ERR_DUP_EXIST},
             // The general case that duplicating to remote cluster with same 
remote_app_name.
             {kTestSameAppName,
              kTestRemoteClusterName,
@@ -477,10 +479,12 @@ public:
         for (auto test : tests) {
             duplication_add_response resp;
             if (test.specified) {
-                resp = create_dup(
-                    test.app_name, test.remote, test.remote_app_name, 
test.remote_replica_count);
+                resp = create_dup(test.app_name,
+                                  test.remote_cluster_name,
+                                  test.remote_app_name,
+                                  test.remote_replica_count);
             } else {
-                resp = create_dup_unspecified(test.app_name, test.remote);
+                resp = create_dup_unspecified(test.app_name, 
test.remote_cluster_name);
             }
 
             ASSERT_EQ(test.wec, resp.err);
@@ -494,7 +498,7 @@ public:
             ASSERT_TRUE(dup != nullptr);
             ASSERT_EQ(app->app_id, dup->app_id);
             ASSERT_EQ(duplication_status::DS_PREPARE, dup->_status);
-            ASSERT_EQ(test.remote, dup->remote_cluster_name);
+            ASSERT_EQ(test.remote_cluster_name, dup->remote_cluster_name);
             ASSERT_EQ(test.remote_app_name, resp.remote_app_name);
             ASSERT_EQ(test.remote_app_name, dup->remote_app_name);
             ASSERT_EQ(test.remote_replica_count, resp.remote_replica_count);
@@ -524,23 +528,24 @@ TEST_F(meta_duplication_service_test, 
dup_op_upon_unavail_app)
     create_app(test_app_unavail);
     find_app(test_app_unavail)->status = app_status::AS_DROPPED;
 
-    dupid_t test_dup = create_dup(kTestAppName).dupid;
-
     struct TestData
     {
         std::string app;
-
         error_code wec;
     } tests[] = {
         {test_app_not_exist, ERR_APP_NOT_EXIST},
         {test_app_unavail, ERR_APP_NOT_EXIST},
-
         {kTestAppName, ERR_OK},
     };
 
     for (auto test : tests) {
+        const auto &resp = create_dup(test.app);
+        ASSERT_EQ(test.wec, resp.err);
+
         ASSERT_EQ(test.wec, query_dup_info(test.app).err);
-        ASSERT_EQ(test.wec, create_dup(test.app).err);
+
+        // For the response with some error, `dupid` doesn't matter.
+        dupid_t test_dup = test.wec == ERR_OK ? resp.dupid : 
static_cast<dupid_t>(dsn_now_s());
         ASSERT_EQ(test.wec,
                   change_dup_status(test.app, test_dup, 
duplication_status::DS_REMOVED).err);
     }
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index cc1b35686..965eb64df 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -71,16 +71,21 @@
                "ERROR: {}\n",                                                  
                    \
                fmt::format(msg, ##__VA_ARGS__))
 
-#define SHELL_PRINTLN_WARNING(msg, ...)                                        
                    \
+#define SHELL_PRINT_WARNING_BASE(msg, ...)                                     
                    \
     fmt::print(stdout,                                                         
                    \
                fmt::emphasis::bold | fmt::fg(fmt::color::yellow),              
                    \
-               "WARNING: {}\n",                                                
                    \
+               "WARNING: {}",                                                  
                    \
                fmt::format(msg, ##__VA_ARGS__))
 
+#define SHELL_PRINT_WARNING(msg, ...) SHELL_PRINT_WARNING_BASE(msg, 
##__VA_ARGS__)
+
+#define SHELL_PRINTLN_WARNING(msg, ...)                                        
                    \
+    SHELL_PRINT_WARNING_BASE("{}\n", fmt::format(msg, ##__VA_ARGS__))
+
 #define SHELL_PRINT_OK_BASE(msg, ...)                                          
                    \
     fmt::print(stdout, fmt::emphasis::bold | fmt::fg(fmt::color::green), msg, 
##__VA_ARGS__)
 
-#define SHELL_PRINT_OK(msg, ...) SHELL_PRINT_OK_BASE("{}", fmt::format(msg, 
##__VA_ARGS__))
+#define SHELL_PRINT_OK(msg, ...) SHELL_PRINT_OK_BASE(msg, ##__VA_ARGS__)
 
 #define SHELL_PRINTLN_OK(msg, ...) SHELL_PRINT_OK_BASE("{}\n", 
fmt::format(msg, ##__VA_ARGS__))
 
diff --git a/src/shell/commands/duplication.cpp 
b/src/shell/commands/duplication.cpp
index cd7c07ad6..82237af1a 100644
--- a/src/shell/commands/duplication.cpp
+++ b/src/shell/commands/duplication.cpp
@@ -102,7 +102,7 @@ bool add_dup(command_executor *e, shell_context *sc, 
arguments args)
         hint = err_resp.get_value().hint;
     }
 
-    if (!err) {
+    if (!err && err.code() != dsn::ERR_DUP_EXIST) {
         SHELL_PRINTLN_ERROR(
             "adding duplication failed [app_name: {}, remote_cluster_name: {}, 
"
             "is_duplicating_checkpoint: {}, remote_app_name: {}, 
remote_replica_count: {}, "
@@ -121,15 +121,22 @@ bool add_dup(command_executor *e, shell_context *sc, 
arguments args)
         return true;
     }
 
+    if (err.code() == dsn::ERR_DUP_EXIST) {
+        SHELL_PRINT_WARNING("duplication has been existing");
+    } else {
+        SHELL_PRINT_OK("adding duplication succeed");
+    }
+
     const auto &resp = err_resp.get_value();
-    SHELL_PRINT_OK(
-        "adding duplication succeed [app_name: {}, remote_cluster_name: {}, 
appid: {}, dupid: "
-        "{}, is_duplicating_checkpoint: {}",
-        app_name,
-        remote_cluster_name,
-        resp.appid,
-        resp.dupid,
-        is_duplicating_checkpoint);
+    SHELL_PRINT_OK(" [app_name: {}, remote_cluster_name: {}, appid: {}, dupid: 
{}",
+                   app_name,
+                   remote_cluster_name,
+                   resp.appid,
+                   resp.dupid);
+
+    if (err) {
+        SHELL_PRINT_OK(", is_duplicating_checkpoint: {}", 
is_duplicating_checkpoint);
+    }
 
     if (resp.__isset.remote_app_name) {
         SHELL_PRINT_OK(", remote_app_name: {}", resp.remote_app_name);
diff --git a/src/utils/error_code.h b/src/utils/error_code.h
index 04df97947..023ec2b25 100644
--- a/src/utils/error_code.h
+++ b/src/utils/error_code.h
@@ -182,6 +182,9 @@ DEFINE_ERR_CODE(ERR_RDB_CORRUPTION)
 DEFINE_ERR_CODE(ERR_DISK_IO_ERROR)
 
 DEFINE_ERR_CODE(ERR_CURL_FAILED)
+
+DEFINE_ERR_CODE(ERR_DUP_EXIST)
+
 } // namespace dsn
 
 USER_DEFINED_STRUCTURE_FORMATTER(::dsn::error_code);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to