This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ee404ebf feat(duplication): add options to support cluster name only 
used for duplication and allow any other cluster id except myself to be ignored 
(#2000)
0ee404ebf is described below

commit 0ee404ebf3549a784e28ea5e0affd6c039c4e709
Author: Dan Wang <[email protected]>
AuthorDate: Mon May 13 11:26:41 2024 +0800

    feat(duplication): add options to support cluster name only used for 
duplication and allow any other cluster id except myself to be ignored (#2000)
    
    The purpose of this PR is to optimize configurations for duplications.
    
    Firstly, many Pegasus clusters are configured with the same `cluster_name`
    (namely `[replication]cluster_name`). However, once we decide to duplicate
    tables between them, their `cluster_name` have to be changed to be 
distinguished
    from each other -- this might lead to side effects.
    
    Secondly, consider a scenario where many clusters are duplicated to a target
    cluster. This means we have to add many cluster ids to the `*.ini` file of 
the target
    cluster, and the target cluster might be restarted very frequently.
    
    Thus following options are added to solve both problems:
    
    ```diff
    [replication]
    + dup_cluster_name =
    + dup_ignore_other_cluster_ids = false
    ```
    
    `[replication]dup_cluster_name` is added only for duplication in case 
`cluster_name`
    has to be changed, while `[replication]dup_ignore_other_cluster_ids` is 
added so that
    only the target cluster id should be configured and there is no need to add 
any other
    cluster id.
---
 src/common/common.cpp                              | 23 ++++++++++-
 src/common/common.h                                | 14 ++++++-
 src/common/duplication_common.cpp                  | 44 +++++++++++++++++++---
 src/common/duplication_common.h                    | 13 +++----
 src/common/test/common_test.cpp                    | 17 ++++++++-
 src/common/test/duplication_common_test.cpp        | 44 +++++++++++++++++++---
 src/meta/duplication/meta_duplication_service.cpp  | 19 ++++++----
 src/server/pegasus_mutation_duplicator.cpp         | 27 ++++++++++---
 src/server/pegasus_write_service.cpp               |  4 +-
 src/server/pegasus_write_service.h                 | 10 -----
 src/server/pegasus_write_service_impl.h            | 10 -----
 src/server/rocksdb_wrapper.cpp                     |  4 +-
 .../test/pegasus_mutation_duplicator_test.cpp      |  7 ++--
 src/test_util/test_util.h                          |  2 +-
 src/utils/fmt_logging.h                            | 24 ++++++++++++
 15 files changed, 200 insertions(+), 62 deletions(-)

diff --git a/src/common/common.cpp b/src/common/common.cpp
index 9ea1f558d..4cc6fb7ee 100644
--- a/src/common/common.cpp
+++ b/src/common/common.cpp
@@ -21,7 +21,17 @@
 #include "utils/fmt_logging.h"
 #include "utils/strings.h"
 
-DSN_DEFINE_string(replication, cluster_name, "", "name of this cluster");
+DSN_DEFINE_string(replication, cluster_name, "", "The name of this cluster");
+
+// Many Pegasus clusters are configured with the same `cluster_name`(namely
+// `[replication]cluster_name`). However, once we decide to duplicate tables
+// between them, their `cluster_name` have to be changed to be distinguished
+// from each other -- this might lead to side effects. Thus use 
`dup_cluster_name`
+// only for duplication in case `cluster_name` has to be changed.
+DSN_DEFINE_string(replication,
+                  dup_cluster_name,
+                  "",
+                  "The name of this cluster only used for duplication");
 
 namespace dsn {
 
@@ -31,5 +41,16 @@ namespace dsn {
     return FLAGS_cluster_name;
 }
 
+/*extern*/ const char *get_current_dup_cluster_name()
+{
+    if (!utils::is_empty(FLAGS_dup_cluster_name)) {
+        return FLAGS_dup_cluster_name;
+    }
+
+    // Once `dup_cluster_name` is not configured, use cluster_name instead.
+    return get_current_cluster_name();
+}
+
 const std::string PEGASUS_CLUSTER_SECTION_NAME("pegasus.clusters");
+
 } // namespace dsn
diff --git a/src/common/common.h b/src/common/common.h
index 6ef992c6f..7965e6279 100644
--- a/src/common/common.h
+++ b/src/common/common.h
@@ -22,11 +22,21 @@
 #include <string>
 
 namespace dsn {
-/// Returns the cluster name (i.e, "onebox") if it's configured under
-/// "replication" section:
+/// Returns the cluster name ("onebox" in the following example) if it's
+/// configured under "replication" section:
 ///    [replication]
 ///      cluster_name = "onebox"
 extern const char *get_current_cluster_name();
 
+/// Returns the cluster name ("onebox" in the following example) which is
+/// only used for duplication (see the definition for `dup_cluster_name`
+/// flag for details) if it's configured under "replication" section:
+///    [replication]
+///      dup_cluster_name = "onebox"
+///
+/// However, once `[replication]dup_cluster_name` is not configured,
+/// `[replication]cluster_name` would be returned.
+extern const char *get_current_dup_cluster_name();
+
 extern const std::string PEGASUS_CLUSTER_SECTION_NAME;
 } // namespace dsn
diff --git a/src/common/duplication_common.cpp 
b/src/common/duplication_common.cpp
index 6c6e1ad03..0aea93347 100644
--- a/src/common/duplication_common.cpp
+++ b/src/common/duplication_common.cpp
@@ -19,9 +19,11 @@
 
 #include <nlohmann/json.hpp>
 #include <cstdint>
+#include <map>
 #include <utility>
 #include <vector>
 
+#include "common/common.h"
 #include "duplication_types.h"
 #include "nlohmann/detail/json_ref.hpp"
 #include "nlohmann/json_fwd.hpp"
@@ -37,6 +39,17 @@ DSN_DEFINE_uint32(replication,
                   "send mutation log batch bytes size per rpc");
 DSN_TAG_VARIABLE(duplicate_log_batch_bytes, FT_MUTABLE);
 
+// While many clusters are duplicated to a target cluster, we have to add many 
cluster
+// ids to the `*.ini` file of the target cluster, and the target cluster might 
be restarted
+// very frequently.
+//
+// This option is added so that only the target cluster id should be 
configured while
+// there is no need to add any other cluster id.
+DSN_DEFINE_bool(replication,
+                dup_ignore_other_cluster_ids,
+                false,
+                "Allow any other cluster id except myself to be ignored for 
duplication");
+
 namespace dsn {
 namespace replication {
 
@@ -89,7 +102,6 @@ public:
         return it->second;
     }
 
-    const std::map<std::string, uint8_t> &get_duplication_group() { return 
_group; }
     const std::set<uint8_t> &get_distinct_cluster_id_set() { return 
_distinct_cids; }
 
 private:
@@ -132,6 +144,22 @@ private:
     return 
internal::duplication_group_registry::instance().get_cluster_id(cluster_name);
 }
 
+/*extern*/ uint8_t get_current_dup_cluster_id_or_default()
+{
+    // Set cluster id to 0 as default if it is not configured, which means it 
would accept
+    // writes from any cluster as long as the timestamp is larger.
+    static const auto res = 
get_duplication_cluster_id(get_current_dup_cluster_name());
+    static const uint8_t cluster_id = res.is_ok() ? res.get_value() : 0;
+    return cluster_id;
+}
+
+/*extern*/ uint8_t get_current_dup_cluster_id()
+{
+    static const uint8_t cluster_id =
+        get_duplication_cluster_id(get_current_dup_cluster_name()).get_value();
+    return cluster_id;
+}
+
 // TODO(wutao1): implement our C++ version of `TSimpleJSONProtocol` if there're
 //               more cases for converting thrift to JSON
 static nlohmann::json duplication_entry_to_json(const duplication_entry &ent)
@@ -184,14 +212,20 @@ static nlohmann::json duplication_entry_to_json(const 
duplication_entry &ent)
     return json.dump();
 }
 
-/*extern*/ const std::map<std::string, uint8_t> &get_duplication_group()
+/*extern*/ const std::set<uint8_t> &get_distinct_cluster_id_set()
 {
-    return 
internal::duplication_group_registry::instance().get_duplication_group();
+    return 
internal::duplication_group_registry::instance().get_distinct_cluster_id_set();
 }
 
-/*extern*/ const std::set<uint8_t> &get_distinct_cluster_id_set()
+/*extern*/ bool is_dup_cluster_id_configured(uint8_t cluster_id)
 {
-    return 
internal::duplication_group_registry::instance().get_distinct_cluster_id_set();
+    if (cluster_id != get_current_dup_cluster_id()) {
+        if (FLAGS_dup_ignore_other_cluster_ids) {
+            return true;
+        }
+    }
+
+    return get_distinct_cluster_id_set().find(cluster_id) != 
get_distinct_cluster_id_set().end();
 }
 
 } // namespace replication
diff --git a/src/common/duplication_common.h b/src/common/duplication_common.h
index 0918d978b..fe0ec067b 100644
--- a/src/common/duplication_common.h
+++ b/src/common/duplication_common.h
@@ -20,7 +20,6 @@
 #pragma once
 
 #include <stdint.h>
-#include <map>
 #include <set>
 #include <string>
 
@@ -63,21 +62,19 @@ inline bool 
is_duplication_status_invalid(duplication_status::type status)
 /// The returned cluster id of get_duplication_cluster_id("wuhan-mi-srv-ad") 
is 3.
 extern error_with<uint8_t> get_duplication_cluster_id(const std::string 
&cluster_name);
 
+extern uint8_t get_current_dup_cluster_id_or_default();
+
+extern uint8_t get_current_dup_cluster_id();
+
 /// Returns a json string.
 extern std::string duplication_entry_to_string(const duplication_entry &dup);
 
 /// Returns a json string.
 extern std::string duplication_query_response_to_string(const 
duplication_query_response &);
 
-/// Returns a mapping from cluster_name to cluster_id.
-extern const std::map<std::string, uint8_t> &get_duplication_group();
-
 extern const std::set<uint8_t> &get_distinct_cluster_id_set();
 
-inline bool is_cluster_id_configured(uint8_t cid)
-{
-    return get_distinct_cluster_id_set().find(cid) != 
get_distinct_cluster_id_set().end();
-}
+extern bool is_dup_cluster_id_configured(uint8_t cluster_id);
 
 struct duplication_constants
 {
diff --git a/src/common/test/common_test.cpp b/src/common/test/common_test.cpp
index f645bbe70..2b2a62124 100644
--- a/src/common/test/common_test.cpp
+++ b/src/common/test/common_test.cpp
@@ -20,10 +20,25 @@
 #include "common/common.h"
 
 #include "gtest/gtest.h"
+#include "test_util/test_util.h"
+#include "utils/flags.h"
+
+DSN_DECLARE_string(dup_cluster_name);
 
 namespace dsn {
+
 TEST(duplication_common, get_current_cluster_name)
 {
-    ASSERT_STREQ(get_current_cluster_name(), "master-cluster");
+    ASSERT_STREQ("master-cluster", get_current_cluster_name());
 }
+
+TEST(duplication_common, get_current_dup_cluster_name)
+{
+    ASSERT_STREQ("master-cluster", get_current_dup_cluster_name());
+
+    PRESERVE_FLAG(dup_cluster_name);
+    FLAGS_dup_cluster_name = "slave-cluster";
+    ASSERT_STREQ("slave-cluster", get_current_dup_cluster_name());
+}
+
 } // namespace dsn
diff --git a/src/common/test/duplication_common_test.cpp 
b/src/common/test/duplication_common_test.cpp
index 8d5fa6354..11690f405 100644
--- a/src/common/test/duplication_common_test.cpp
+++ b/src/common/test/duplication_common_test.cpp
@@ -29,23 +29,57 @@
 #include <cstdint>
 
 #include "gtest/gtest.h"
+#include "test_util/test_util.h"
 #include "utils/error_code.h"
+#include "utils/flags.h"
+
+DSN_DECLARE_string(dup_cluster_name);
+DSN_DECLARE_bool(dup_ignore_other_cluster_ids);
 
 namespace dsn {
 namespace replication {
 
 TEST(duplication_common, get_duplication_cluster_id)
 {
-    ASSERT_EQ(get_duplication_cluster_id("master-cluster").get_value(), 1);
-    ASSERT_EQ(get_duplication_cluster_id("slave-cluster").get_value(), 2);
+    ASSERT_EQ(1, get_duplication_cluster_id("master-cluster").get_value());
+    ASSERT_EQ(2, get_duplication_cluster_id("slave-cluster").get_value());
+
+    ASSERT_EQ(ERR_INVALID_PARAMETERS, 
get_duplication_cluster_id("").get_error().code());
+    ASSERT_EQ(ERR_OBJECT_NOT_FOUND, 
get_duplication_cluster_id("unknown").get_error().code());
+}
+
+TEST(duplication_common, get_current_dup_cluster_id)
+{
+    ASSERT_EQ(1, get_current_dup_cluster_id());
 
-    ASSERT_EQ(get_duplication_cluster_id("").get_error().code(), 
ERR_INVALID_PARAMETERS);
-    ASSERT_EQ(get_duplication_cluster_id("unknown").get_error().code(), 
ERR_OBJECT_NOT_FOUND);
+    // Current cluster id is static, thus updating dup cluster name should 
never change
+    // current cluster id.
+    PRESERVE_FLAG(dup_cluster_name);
+    FLAGS_dup_cluster_name = "slave-cluster";
+    ASSERT_EQ(1, get_current_dup_cluster_id());
 }
 
 TEST(duplication_common, get_distinct_cluster_id_set)
 {
-    ASSERT_EQ(get_distinct_cluster_id_set(), std::set<uint8_t>({1, 2}));
+    ASSERT_EQ(std::set<uint8_t>({1, 2}), get_distinct_cluster_id_set());
+}
+
+TEST(duplication_common, is_dup_cluster_id_configured)
+{
+    ASSERT_FALSE(is_dup_cluster_id_configured(0));
+    ASSERT_TRUE(is_dup_cluster_id_configured(1));
+    ASSERT_TRUE(is_dup_cluster_id_configured(2));
+    ASSERT_FALSE(is_dup_cluster_id_configured(3));
+}
+
+TEST(duplication_common, dup_ignore_other_cluster_ids)
+{
+    PRESERVE_FLAG(dup_ignore_other_cluster_ids);
+    FLAGS_dup_ignore_other_cluster_ids = true;
+
+    for (uint8_t id = 0; id < 4; ++id) {
+        ASSERT_TRUE(is_dup_cluster_id_configured(id));
+    }
 }
 
 } // namespace replication
diff --git a/src/meta/duplication/meta_duplication_service.cpp 
b/src/meta/duplication/meta_duplication_service.cpp
index 2673f5055..c8a0d888e 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -49,11 +49,14 @@
 #include "utils/error_code.h"
 #include "utils/errors.h"
 #include "utils/fail_point.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/ports.h"
 #include "utils/string_conv.h"
 #include "utils/zlocks.h"
 
+DSN_DECLARE_bool(dup_ignore_other_cluster_ids);
+
 namespace dsn {
 namespace replication {
 
@@ -209,13 +212,15 @@ void 
meta_duplication_service::add_duplication(duplication_add_rpc rpc)
                                            ERR_INVALID_PARAMETERS,
                                            "illegal operation: adding 
duplication to itself");
 
-    auto remote_cluster_id = 
get_duplication_cluster_id(request.remote_cluster_name);
-    LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(),
-                                           response,
-                                           ERR_INVALID_PARAMETERS,
-                                           "get_duplication_cluster_id({}) 
failed, error: {}",
-                                           request.remote_cluster_name,
-                                           remote_cluster_id.get_error());
+    if (!FLAGS_dup_ignore_other_cluster_ids) {
+        auto remote_cluster_id = 
get_duplication_cluster_id(request.remote_cluster_name);
+        LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(remote_cluster_id.is_ok(),
+                                               response,
+                                               ERR_INVALID_PARAMETERS,
+                                               "get_duplication_cluster_id({}) 
failed, error: {}",
+                                               request.remote_cluster_name,
+                                               remote_cluster_id.get_error());
+    }
 
     std::vector<host_port> meta_list;
     LOG_WARNING_DUP_HINT_AND_RETURN_IF_NOT(
diff --git a/src/server/pegasus_mutation_duplicator.cpp 
b/src/server/pegasus_mutation_duplicator.cpp
index de13e8ee9..849fa56b3 100644
--- a/src/server/pegasus_mutation_duplicator.cpp
+++ b/src/server/pegasus_mutation_duplicator.cpp
@@ -32,6 +32,7 @@
 #include <vector>
 
 #include "client_lib/pegasus_client_impl.h"
+#include "common/common.h"
 #include "common/duplication_common.h"
 #include "duplication_internal_types.h"
 #include "pegasus/client.h"
@@ -40,7 +41,6 @@
 #include "rrdb/rrdb_types.h"
 #include "runtime/message_utils.h"
 #include "runtime/rpc/rpc_message.h"
-#include "server/pegasus_write_service.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/chrono_literals.h"
@@ -50,6 +50,8 @@
 #include "utils/fmt_logging.h"
 #include "utils/rand.h"
 
+DSN_DECLARE_bool(dup_ignore_other_cluster_ids);
+
 METRIC_DEFINE_counter(replica,
                       dup_shipped_successful_requests,
                       dsn::metric_unit::kRequests,
@@ -127,6 +129,19 @@ 
pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli
     pegasus_client *client = 
pegasus_client_factory::get_client(remote_cluster.data(), app.data());
     _client = static_cast<client::pegasus_client_impl *>(client);
 
+    CHECK_STRNE_PREFIX_MSG(dsn::get_current_dup_cluster_name(),
+                           remote_cluster.data(),
+                           "remote cluster should not be myself: {}",
+                           remote_cluster);
+
+    if (FLAGS_dup_ignore_other_cluster_ids) {
+        LOG_INFO_PREFIX("initialize mutation duplicator for local cluster 
[id:{}], "
+                        "remote cluster [id:ignored, addr:{}]",
+                        dsn::replication::get_current_dup_cluster_id(),
+                        remote_cluster);
+        return;
+    }
+
     auto ret = 
dsn::replication::get_duplication_cluster_id(remote_cluster.data());
     CHECK_PREFIX_MSG(ret.is_ok(), // never possible, meta server disallows 
such remote_cluster.
                      "invalid remote cluster: {}, err_ret: {}",
@@ -136,13 +151,15 @@ 
pegasus_mutation_duplicator::pegasus_mutation_duplicator(dsn::replication::repli
 
     LOG_INFO_PREFIX("initialize mutation duplicator for local cluster [id:{}], 
"
                     "remote cluster [id:{}, addr:{}]",
-                    get_current_cluster_id(),
+                    dsn::replication::get_current_dup_cluster_id(),
                     _remote_cluster_id,
                     remote_cluster);
 
     // never possible to duplicate data to itself
-    CHECK_NE_PREFIX_MSG(
-        get_current_cluster_id(), _remote_cluster_id, "invalid remote cluster: 
{}", remote_cluster);
+    CHECK_NE_PREFIX_MSG(dsn::replication::get_current_dup_cluster_id(),
+                        _remote_cluster_id,
+                        "invalid remote cluster: {}",
+                        remote_cluster);
 }
 
 void pegasus_mutation_duplicator::send(uint64_t hash, callback cb)
@@ -237,7 +254,7 @@ void 
pegasus_mutation_duplicator::duplicate(mutation_tuple_set muts, callback cb
             entry.__set_raw_message(raw_message);
             entry.__set_task_code(rpc_code);
             entry.__set_timestamp(std::get<0>(mut));
-            entry.__set_cluster_id(get_current_cluster_id());
+            
entry.__set_cluster_id(dsn::replication::get_current_dup_cluster_id());
             batch_request->entries.emplace_back(std::move(entry));
             batch_bytes += raw_message.length();
         }
diff --git a/src/server/pegasus_write_service.cpp 
b/src/server/pegasus_write_service.cpp
index a7e9d9342..d688ab6e4 100644
--- a/src/server/pegasus_write_service.cpp
+++ b/src/server/pegasus_write_service.cpp
@@ -349,12 +349,12 @@ int pegasus_write_service::duplicate(int64_t decree,
 {
     // Verifies the cluster_id.
     for (const auto &request : requests.entries) {
-        if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) {
+        if 
(!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) {
             resp.__set_error(rocksdb::Status::kInvalidArgument);
             resp.__set_error_hint("request cluster id is unconfigured");
             return empty_put(decree);
         }
-        if (request.cluster_id == get_current_cluster_id()) {
+        if (request.cluster_id == 
dsn::replication::get_current_dup_cluster_id()) {
             resp.__set_error(rocksdb::Status::kInvalidArgument);
             resp.__set_error_hint("self-duplicating");
             return empty_put(decree);
diff --git a/src/server/pegasus_write_service.h 
b/src/server/pegasus_write_service.h
index eed596e35..f430b48b0 100644
--- a/src/server/pegasus_write_service.h
+++ b/src/server/pegasus_write_service.h
@@ -22,10 +22,7 @@
 #include <cstdint>
 #include <memory>
 
-#include "common//duplication_common.h"
-#include "common/common.h"
 #include "replica/replica_base.h"
-#include "utils/errors.h"
 #include "utils/metrics.h"
 
 namespace dsn {
@@ -55,13 +52,6 @@ class ingestion_response;
 namespace pegasus {
 namespace server {
 
-inline uint8_t get_current_cluster_id()
-{
-    static const uint8_t cluster_id =
-        
dsn::replication::get_duplication_cluster_id(dsn::get_current_cluster_name()).get_value();
-    return cluster_id;
-}
-
 // The context of an mutation to the database.
 struct db_write_context
 {
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index f551b1b55..6eec4b760 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -59,16 +59,6 @@ struct db_get_context
     bool expired{false};
 };
 
-inline int get_cluster_id_if_exists()
-{
-    // cluster_id is 0 if not configured, which means it will accept writes
-    // from any cluster as long as the timestamp is larger.
-    static auto cluster_id_res =
-        
dsn::replication::get_duplication_cluster_id(dsn::get_current_cluster_name());
-    static uint64_t cluster_id = cluster_id_res.is_ok() ? 
cluster_id_res.get_value() : 0;
-    return cluster_id;
-}
-
 inline dsn::error_code get_external_files_path(const std::string 
&bulk_load_dir,
                                                const bool verify_before_ingest,
                                                const 
dsn::replication::bulk_load_metadata &metadata,
diff --git a/src/server/rocksdb_wrapper.cpp b/src/server/rocksdb_wrapper.cpp
index e4d6e3b21..59203836b 100644
--- a/src/server/rocksdb_wrapper.cpp
+++ b/src/server/rocksdb_wrapper.cpp
@@ -26,6 +26,7 @@
 
 #include "base/meta_store.h"
 #include "base/pegasus_value_schema.h"
+#include "common/duplication_common.h"
 #include "pegasus_key_schema.h"
 #include "pegasus_utils.h"
 #include "pegasus_write_service_impl.h"
@@ -121,7 +122,8 @@ int rocksdb_wrapper::write_batch_put_ctx(const 
db_write_context &ctx,
 
     uint64_t new_timetag = ctx.remote_timetag;
     if (!ctx.is_duplicated_write()) { // local write
-        new_timetag = generate_timetag(ctx.timestamp, 
get_cluster_id_if_exists(), false);
+        new_timetag = generate_timetag(
+            ctx.timestamp, 
dsn::replication::get_current_dup_cluster_id_or_default(), false);
     }
 
     if (ctx.verify_timetag &&         // needs read-before-write
diff --git a/src/server/test/pegasus_mutation_duplicator_test.cpp 
b/src/server/test/pegasus_mutation_duplicator_test.cpp
index fac7c449b..aaf91a065 100644
--- a/src/server/test/pegasus_mutation_duplicator_test.cpp
+++ b/src/server/test/pegasus_mutation_duplicator_test.cpp
@@ -44,7 +44,6 @@
 #include "runtime/message_utils.h"
 #include "runtime/rpc/rpc_holder.h"
 #include "runtime/rpc/rpc_message.h"
-#include "server/pegasus_write_service.h"
 #include "utils/blob.h"
 #include "utils/error_code.h"
 
@@ -275,9 +274,9 @@ public:
         auto duplicator = new_mutation_duplicator(&replica, "onebox2", "temp");
         duplicator->set_task_environment(&_env);
         auto duplicator_impl = dynamic_cast<pegasus_mutation_duplicator 
*>(duplicator.get());
-        ASSERT_EQ(duplicator_impl->_remote_cluster_id, 2);
-        ASSERT_EQ(duplicator_impl->_remote_cluster, "onebox2");
-        ASSERT_EQ(get_current_cluster_id(), 1);
+        ASSERT_EQ(2, duplicator_impl->_remote_cluster_id);
+        ASSERT_STREQ("onebox2", duplicator_impl->_remote_cluster.c_str());
+        ASSERT_EQ(1, get_current_dup_cluster_id());
     }
 
 private:
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index 46be616c9..2e2b34bb0 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -46,7 +46,7 @@ class file_meta;
 
 // Save the current value of a flag and restore it at the end of the function.
 #define PRESERVE_FLAG(name)                                                    
                    \
-    auto PRESERVED_FLAGS_##name = FLAGS_##name;                                
                    \
+    const auto PRESERVED_FLAGS_##name = FLAGS_##name;                          
                    \
     auto PRESERVED_FLAGS_##name##_cleanup =                                    
                    \
         dsn::defer([PRESERVED_FLAGS_##name]() { FLAGS_##name = 
PRESERVED_FLAGS_##name; })
 
diff --git a/src/utils/fmt_logging.h b/src/utils/fmt_logging.h
index 989e6eb91..97e751a48 100644
--- a/src/utils/fmt_logging.h
+++ b/src/utils/fmt_logging.h
@@ -207,6 +207,30 @@ inline const char *null_str_printer(const char *s) { 
return s == nullptr ? "(nul
 #define CHECK_NOTNULL_PREFIX_MSG(p, ...) CHECK_PREFIX_MSG(p != nullptr, 
__VA_ARGS__)
 #define CHECK_NOTNULL_PREFIX(p) CHECK_NOTNULL_PREFIX_MSG(p, "")
 
+#define CHECK_STREQ_PREFIX_MSG(var1, var2, ...)                                
                    \
+    do {                                                                       
                    \
+        const auto &_v1 = (var1);                                              
                    \
+        const auto &_v2 = (var2);                                              
                    \
+        CHECK_EXPRESSION_PREFIX_MSG(var1 == var2,                              
                    \
+                                    dsn::utils::equals(_v1, _v2),              
                    \
+                                    "{} vs {} {}",                             
                    \
+                                    null_str_printer(_v1),                     
                    \
+                                    null_str_printer(_v2),                     
                    \
+                                    fmt::format(__VA_ARGS__));                 
                    \
+    } while (false)
+
+#define CHECK_STRNE_PREFIX_MSG(var1, var2, ...)                                
                    \
+    do {                                                                       
                    \
+        const auto &_v1 = (var1);                                              
                    \
+        const auto &_v2 = (var2);                                              
                    \
+        CHECK_EXPRESSION_PREFIX_MSG(var1 != var2,                              
                    \
+                                    !dsn::utils::equals(_v1, _v2),             
                    \
+                                    "{} vs {} {}",                             
                    \
+                                    null_str_printer(_v1),                     
                    \
+                                    null_str_printer(_v2),                     
                    \
+                                    fmt::format(__VA_ARGS__));                 
                    \
+    } while (false)
+
 #define CHECK_NE_PREFIX_MSG(var1, var2, ...)                                   
                    \
     do {                                                                       
                    \
         const auto &_v1 = (var1);                                              
                    \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to