This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ad827786 refactor(FQDN): Update src/common/consensus.thrift related 
code (#1989)
9ad827786 is described below

commit 9ad82778624cdc6a46ee5488df00eaa361bdada8
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 29 10:51:19 2024 +0800

    refactor(FQDN): Update src/common/consensus.thrift related code (#1989)
---
 .../duplication/test/replica_follower_test.cpp     |   4 +-
 src/replica/replica_check.cpp                      |  13 +-
 src/replica/replica_chkpt.cpp                      |   5 +-
 src/replica/replica_learn.cpp                      | 141 +++++++++------------
 src/replica/split/replica_split_manager.cpp        |   5 +-
 src/replica/split/test/replica_split_test.cpp      |   9 +-
 6 files changed, 73 insertions(+), 104 deletions(-)

diff --git a/src/replica/duplication/test/replica_follower_test.cpp 
b/src/replica/duplication/test/replica_follower_test.cpp
index 34585a982..15728386e 100644
--- a/src/replica/duplication/test/replica_follower_test.cpp
+++ b/src/replica/duplication/test/replica_follower_test.cpp
@@ -32,7 +32,6 @@
 #include "nfs/nfs_node.h"
 #include "replica/duplication/replica_follower.h"
 #include "replica/test/mock_utils.h"
-#include "runtime/rpc/dns_resolver.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_host_port.h"
 #include "runtime/task/task_tracker.h"
@@ -278,8 +277,7 @@ TEST_P(replica_follower_test, test_nfs_copy_checkpoint)
 
     auto resp = learn_response();
     const host_port learnee("localhost", 34801);
-    resp.learnee = dsn::dns_resolver::instance().resolve_address(learnee);
-    resp.__set_hp_learnee(learnee);
+    SET_IP_AND_HOST_PORT_BY_DNS(resp, learnee, learnee);
 
     std::string dest = utils::filesystem::path_combine(
         _mock_replica->dir(), 
duplication_constants::kDuplicationCheckpointRootDir);
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index 22148c2bd..c4a86dec5 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -122,16 +122,16 @@ void replica::broadcast_group_check()
     }
 
     for (auto it = _primary_states.statuses.begin(); it != 
_primary_states.statuses.end(); ++it) {
-        if (it->first == _stub->primary_host_port())
+        if (it->first == _stub->primary_host_port()) {
             continue;
+        }
 
         auto hp = it->first;
-        auto addr = dsn::dns_resolver::instance().resolve_address(hp);
         std::shared_ptr<group_check_request> request(new group_check_request);
 
         request->app = _app_info;
-        request->node = addr;
-        request->__set_hp_node(hp);
+        const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
+        SET_IP_AND_HOST_PORT(*request, node, addr, hp);
         _primary_states.get_replica_config(it->second, request->config);
         request->last_committed_decree = last_committed_decree();
         
request->__set_confirmed_decree(_duplication_mgr->min_confirmed_decree());
@@ -226,8 +226,7 @@ void replica::on_group_check(const group_check_request 
&request,
     }
 
     response.pid = get_gpid();
-    response.node = _stub->primary_address();
-    response.__set_hp_node(_stub->primary_host_port());
+    SET_IP_AND_HOST_PORT(response, node, _stub->primary_address(), 
_stub->primary_host_port());
     response.err = ERR_OK;
     if (status() == partition_status::PS_ERROR) {
         response.err = ERR_INVALID_STATE;
@@ -253,7 +252,7 @@ void replica::on_group_check_reply(error_code err,
     }
 
     auto r = _primary_states.group_check_pending_replies.erase(hp_node);
-    CHECK_EQ_MSG(r, 1, "invalid node address, address = {}({})", hp_node, 
req->node);
+    CHECK_EQ_MSG(r, 1, "invalid node: {}", FMT_HOST_PORT_AND_IP(*req, node));
 
     if (err != ERR_OK || resp->err != ERR_OK) {
         if (ERR_OK == err) {
diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp
index fc2b3a4dc..27346aa17 100644
--- a/src/replica/replica_chkpt.cpp
+++ b/src/replica/replica_chkpt.cpp
@@ -49,6 +49,7 @@
 #include "runtime/api_layer1.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_holder.h"
+#include "runtime/rpc/rpc_host_port.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task.h"
 #include "split/replica_split_manager.h"
@@ -261,8 +262,8 @@ void replica::on_query_last_checkpoint(/*out*/ 
learn_response &response)
         // for example: base_local_dir = "./data" + "checkpoint.1024" = 
"./data/checkpoint.1024"
         response.base_local_dir = utils::filesystem::path_combine(
             _app->data_dir(), 
checkpoint_folder(response.state.to_decree_included));
-        response.learnee = _stub->primary_address();
-        response.__set_hp_learnee(_stub->primary_host_port());
+        SET_IP_AND_HOST_PORT(
+            response, learnee, _stub->primary_address(), 
_stub->primary_host_port());
         for (auto &file : response.state.files) {
             // response.state.files contain file absolute path, for example:
             // "./data/checkpoint.1024/1.sst" use `substr` to get the file 
name: 1.sst
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index 9594824cf..bb8414ce4 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -233,8 +233,7 @@ void replica::init_learn(uint64_t signature)
     request.__set_max_gced_decree(get_max_gced_decree_for_learn());
     request.last_committed_decree_in_app = _app->last_committed_decree();
     request.last_committed_decree_in_prepare_list = 
_prepare_list->last_committed_decree();
-    request.learner = _stub->primary_address();
-    request.__set_hp_learner(_stub->primary_host_port());
+    SET_IP_AND_HOST_PORT(request, learner, _stub->primary_address(), 
_stub->primary_host_port());
     request.signature = _potential_secondary_states.learning_version;
     _app->prepare_get_checkpoint(request.app_specific_learn_request);
 
@@ -399,15 +398,13 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
     // TODO: learner machine has been down for a long time, and DDD MUST 
happened before
     // which leads to state lost. Now the lost state is back, what shall we do?
     if (request.last_committed_decree_in_app > last_prepared_decree()) {
-        LOG_ERROR_PREFIX(
-            "on_learn[{:#018x}]: learner = {}({}), learner state is newer than 
learnee, "
-            "learner_app_committed_decree = {}, local_committed_decree = {}, 
learn "
-            "from scratch",
-            request.signature,
-            hp_learner,
-            request.learner,
-            request.last_committed_decree_in_app,
-            local_committed_decree);
+        LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner state is 
newer than learnee, "
+                         "learner_app_committed_decree = {}, 
local_committed_decree = {}, learn "
+                         "from scratch",
+                         request.signature,
+                         FMT_HOST_PORT_AND_IP(request, learner),
+                         request.last_committed_decree_in_app,
+                         local_committed_decree);
 
         *(decree *)&request.last_committed_decree_in_app = 0;
     }
@@ -416,29 +413,25 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
     // this happens when the new primary does not commit the previously 
prepared mutations
     // yet, which it should do, so let's help it now.
     else if (request.last_committed_decree_in_app > local_committed_decree) {
-        LOG_ERROR_PREFIX(
-            "on_learn[{:#018x}]: learner = {}({}), learner's 
last_committed_decree_in_app "
-            "is newer than learnee, learner_app_committed_decree = {}, "
-            "local_committed_decree = {}, commit local soft",
-            request.signature,
-            hp_learner,
-            request.learner,
-            request.last_committed_decree_in_app,
-            local_committed_decree);
+        LOG_ERROR_PREFIX("on_learn[{:#018x}]: learner = {}, learner's 
last_committed_decree_in_app "
+                         "is newer than learnee, learner_app_committed_decree 
= {}, "
+                         "local_committed_decree = {}, commit local soft",
+                         request.signature,
+                         FMT_HOST_PORT_AND_IP(request, learner),
+                         request.last_committed_decree_in_app,
+                         local_committed_decree);
 
         // we shouldn't commit mutations hard coz these mutations may 
preparing on another learner
         _prepare_list->commit(request.last_committed_decree_in_app, 
COMMIT_TO_DECREE_SOFT);
         local_committed_decree = last_committed_decree();
 
         if (request.last_committed_decree_in_app > local_committed_decree) {
-            LOG_ERROR_PREFIX(
-                "on_learn[{:#018x}]: try to commit primary to {}, still less 
than "
-                "learner({}({}))'s committed decree({}), wait mutations to be 
commitable",
-                request.signature,
-                local_committed_decree,
-                hp_learner,
-                request.learner,
-                request.last_committed_decree_in_app);
+            LOG_ERROR_PREFIX("on_learn[{:#018x}]: try to commit primary to {}, 
still less than "
+                             "learner({})'s committed decree({}), wait 
mutations to be commitable",
+                             request.signature,
+                             local_committed_decree,
+                             FMT_HOST_PORT_AND_IP(request, learner),
+                             request.last_committed_decree_in_app);
             response.err = ERR_INCONSISTENT_STATE;
             reply(msg, response);
             return;
@@ -451,13 +444,12 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
     response.state.__set_learn_start_decree(learn_start_decree);
     bool delayed_replay_prepare_list = false;
 
-    LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), 
remote_committed_decree = {}, "
+    LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, remote_committed_decree 
= {}, "
                     "remote_app_committed_decree = {}, local_committed_decree 
= {}, "
                     "app_committed_decree = {}, app_durable_decree = {}, "
                     "prepare_min_decree = {}, prepare_list_count = {}, 
learn_start_decree = {}",
                     request.signature,
-                    hp_learner,
-                    request.learner,
+                    FMT_HOST_PORT_AND_IP(request, learner),
                     request.last_committed_decree_in_prepare_list,
                     request.last_committed_decree_in_app,
                     local_committed_decree,
@@ -466,9 +458,7 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
                     _prepare_list->min_decree(),
                     _prepare_list->count(),
                     learn_start_decree);
-
-    response.learnee = _stub->primary_address();
-    response.__set_hp_learnee(_stub->primary_host_port());
+    SET_IP_AND_HOST_PORT(response, learnee, _stub->primary_address(), 
_stub->primary_host_port());
     response.prepare_start_decree = invalid_decree;
     response.last_committed_decree = local_committed_decree;
     response.err = ERR_OK;
@@ -482,36 +472,32 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
                                                          
delayed_replay_prepare_list);
     if (!should_learn_cache) {
         if (learn_start_decree > _app->last_durable_decree()) {
-            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to 
learn private logs, "
+            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn 
private logs, "
                             "because learn_start_decree({}) > 
_app->last_durable_decree({})",
                             request.signature,
-                            hp_learner,
-                            request.learner,
+                            FMT_HOST_PORT_AND_IP(request, learner),
                             learn_start_decree,
                             _app->last_durable_decree());
             _private_log->get_learn_state(get_gpid(), learn_start_decree, 
response.state);
             response.type = learn_type::LT_LOG;
         } else if (_private_log->get_learn_state(get_gpid(), 
learn_start_decree, response.state)) {
-            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to 
learn private logs, "
+            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn 
private logs, "
                             "because mutation_log::get_learn_state() returns 
true",
                             request.signature,
-                            hp_learner,
-                            request.learner);
+                            FMT_HOST_PORT_AND_IP(request, learner));
             response.type = learn_type::LT_LOG;
         } else if (learn_start_decree < request.last_committed_decree_in_app + 
1) {
-            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to 
learn private logs, "
+            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn 
private logs, "
                             "because learn_start_decree steps back for 
duplication",
                             request.signature,
-                            hp_learner,
-                            request.learner);
+                            FMT_HOST_PORT_AND_IP(request, learner));
             response.type = learn_type::LT_LOG;
         } else {
-            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), choose to 
learn app, beacuse "
+            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, choose to learn 
app, beacuse "
                             "learn_start_decree({}) <= 
_app->last_durable_decree({}), and "
                             "mutation_log::get_learn_state() returns false",
                             request.signature,
-                            hp_learner,
-                            request.learner,
+                            FMT_HOST_PORT_AND_IP(request, learner),
                             learn_start_decree,
                             _app->last_durable_decree());
             response.type = learn_type::LT_APP;
@@ -523,11 +509,10 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
             if (response.state.files.size() > 0) {
                 auto &last_file = response.state.files.back();
                 if (last_file == learner_state.last_learn_log_file) {
-                    LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), 
learn the same file {} "
+                    LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn 
the same file {} "
                                     "repeatedly, hint to switch file",
                                     request.signature,
-                                    hp_learner,
-                                    request.learner,
+                                    FMT_HOST_PORT_AND_IP(request, learner),
                                     last_file);
                     _private_log->hint_switch_file();
                 } else {
@@ -536,12 +521,11 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
             }
             // it is safe to commit to last_committed_decree() now
             response.state.to_decree_included = last_committed_decree();
-            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}({}), learn 
private logs succeed, "
+            LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, learn private 
logs succeed, "
                             "learned_meta_size = {}, learned_file_count = {}, 
to_decree_included = "
                             "{}",
                             request.signature,
-                            hp_learner,
-                            request.learner,
+                            FMT_HOST_PORT_AND_IP(request, learner),
                             response.state.meta.length(),
                             response.state.files.size(),
                             response.state.to_decree_included);
@@ -552,20 +536,18 @@ void replica::on_learn(dsn::message_ex *msg, const 
learn_request &request)
             if (err != ERR_OK) {
                 response.err = ERR_GET_LEARN_STATE_FAILED;
                 LOG_ERROR_PREFIX(
-                    "on_learn[{:#018x}]: learner = {}({}), get app checkpoint 
failed, error = {}",
+                    "on_learn[{:#018x}]: learner = {}, get app checkpoint 
failed, error = {}",
                     request.signature,
-                    hp_learner,
-                    request.learner,
+                    FMT_HOST_PORT_AND_IP(request, learner),
                     err);
             } else {
                 response.base_local_dir = _app->data_dir();
                 response.__set_replica_disk_tag(_dir_node->tag);
                 LOG_INFO_PREFIX(
-                    "on_learn[{:#018x}]: learner = {}({}), get app learn state 
succeed, "
+                    "on_learn[{:#018x}]: learner = {}, get app learn state 
succeed, "
                     "learned_meta_size = {}, learned_file_count = {}, 
learned_to_decree = {}",
                     request.signature,
-                    hp_learner,
-                    request.learner,
+                    FMT_HOST_PORT_AND_IP(request, learner),
                     response.state.meta.length(),
                     response.state.files.size(),
                     response.state.to_decree_included);
@@ -981,7 +963,7 @@ bool replica::prepare_cached_learn_state(const 
learn_request &request,
 
             LOG_INFO_PREFIX("on_learn[{:#018x}]: learner = {}, set 
prepare_start_decree = {}",
                             request.signature,
-                            request.learner,
+                            FMT_HOST_PORT_AND_IP(request, learner),
                             local_committed_decree + 1);
         }
 
@@ -1007,7 +989,7 @@ bool replica::prepare_cached_learn_state(const 
learn_request &request,
                         "learn_start_decree = {}, prepare_start_decree = {}, 
learn_mutation_count "
                         "= {}, learn_data_size = {}",
                         request.signature,
-                        request.learner,
+                        FMT_HOST_PORT_AND_IP(request, learner),
                         learn_start_decree,
                         response.prepare_start_decree,
                         count,
@@ -1302,8 +1284,7 @@ void replica::notify_learn_completion()
     report.last_committed_decree_in_prepare_list = last_committed_decree();
     report.learner_signature = _potential_secondary_states.learning_version;
     report.learner_status_ = _potential_secondary_states.learning_status;
-    report.node = _stub->primary_address();
-    report.__set_hp_node(_stub->primary_host_port());
+    SET_IP_AND_HOST_PORT(report, node, _stub->primary_address(), 
_stub->primary_host_port());
 
     LOG_INFO_PREFIX("notify_learn_completion[{:#018x}]: learnee = {}, 
learn_duration = {} ms, "
                     "local_committed_decree = {}, app_committed_decree = {}, 
app_durable_decree = "
@@ -1344,41 +1325,35 @@ void replica::on_learn_completion_notification(const 
group_check_response &repor
     GET_HOST_PORT(report, node, hp_node);
 
     LOG_INFO_PREFIX(
-        "on_learn_completion_notification[{:#018x}]: learner = {}({}), 
learning_status = {}",
+        "on_learn_completion_notification[{:#018x}]: learner = {}, 
learning_status = {}",
         report.learner_signature,
-        hp_node,
-        report.node,
+        FMT_HOST_PORT_AND_IP(report, node),
         enum_to_string(report.learner_status_));
 
     if (status() != partition_status::PS_PRIMARY) {
         response.err = (partition_status::PS_INACTIVE == status() && 
_inactive_is_transient)
                            ? ERR_INACTIVE_STATE
                            : ERR_INVALID_STATE;
-        LOG_ERROR_PREFIX(
-            "on_learn_completion_notification[{:#018x}]: learner = {}({}), 
this replica "
-            "is not primary, but {}, reply {}",
-            report.learner_signature,
-            hp_node,
-            report.node,
-            enum_to_string(status()),
-            response.err);
+        LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]: learner 
= {}, this replica "
+                         "is not primary, but {}, reply {}",
+                         report.learner_signature,
+                         FMT_HOST_PORT_AND_IP(report, node),
+                         enum_to_string(status()),
+                         response.err);
     } else if (report.learner_status_ != learner_status::LearningSucceeded) {
         response.err = ERR_INVALID_STATE;
-        LOG_ERROR_PREFIX(
-            "on_learn_completion_notification[{:#018x}]: learner = {}({}), 
learner_status "
-            "is not LearningSucceeded, but {}, reply ERR_INVALID_STATE",
-            report.learner_signature,
-            hp_node,
-            report.node,
-            enum_to_string(report.learner_status_));
+        LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]: learner 
= {}, learner_status "
+                         "is not LearningSucceeded, but {}, reply 
ERR_INVALID_STATE",
+                         report.learner_signature,
+                         FMT_HOST_PORT_AND_IP(report, node),
+                         enum_to_string(report.learner_status_));
     } else {
         response.err = handle_learning_succeeded_on_primary(hp_node, 
report.learner_signature);
         if (response.err != ERR_OK) {
-            LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]: 
learner = {}({}), handle "
+            LOG_ERROR_PREFIX("on_learn_completion_notification[{:#018x}]: 
learner = {}, handle "
                              "learning succeeded on primary failed, reply {}",
                              report.learner_signature,
-                             hp_node,
-                             report.node,
+                             FMT_HOST_PORT_AND_IP(report, node),
                              response.err);
         }
     }
diff --git a/src/replica/split/replica_split_manager.cpp 
b/src/replica/split/replica_split_manager.cpp
index 955dbea23..9844c8ee6 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -1496,8 +1496,9 @@ void 
replica_split_manager::primary_parent_handle_stop_split(
         return;
     }
 
-    _replica->_primary_states.split_stopped_secondary.insert(
-        req->__isset.hp_node ? req->hp_node : 
host_port::from_address(req->node));
+    host_port secondary;
+    GET_HOST_PORT(*req, node, secondary);
+    
_replica->_primary_states.split_stopped_secondary.emplace(std::move(secondary));
     auto count = 0;
     for (auto &iter : _replica->_primary_states.statuses) {
         if (iter.second == partition_status::PS_SECONDARY &&
diff --git a/src/replica/split/test/replica_split_test.cpp 
b/src/replica/split/test/replica_split_test.cpp
index d6f1ee5b4..61f801078 100644
--- a/src/replica/split/test/replica_split_test.cpp
+++ b/src/replica/split/test/replica_split_test.cpp
@@ -40,7 +40,6 @@
 #include "replica/split/replica_split_manager.h"
 #include "replica/test/mock_utils.h"
 #include "replica/test/replica_test_base.h"
-#include "runtime/rpc/dns_resolver.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/rpc/rpc_host_port.h"
 #include "runtime/task/task.h"
@@ -396,8 +395,7 @@ public:
         req.app = _parent_replica->_app_info;
         req.config.ballot = INIT_BALLOT;
         req.config.status = partition_status::PS_SECONDARY;
-        req.node = SECONDARY_ADDR;
-        req.__set_hp_node(SECONDARY);
+        SET_IP_AND_HOST_PORT_BY_DNS(req, node, SECONDARY);
         if (meta_split_status == split_status::PAUSING ||
             meta_split_status == split_status::CANCELING) {
             req.__set_meta_split_status(meta_split_status);
@@ -429,8 +427,7 @@ public:
 
         std::shared_ptr<group_check_request> req = 
std::make_shared<group_check_request>();
         std::shared_ptr<group_check_response> resp = 
std::make_shared<group_check_response>();
-        req->node = SECONDARY_ADDR;
-        req->__set_hp_node(SECONDARY);
+        SET_IPS_AND_HOST_PORTS_BY_DNS(*req, node, SECONDARY);
         if (meta_split_status != split_status::NOT_SPLIT) {
             req->__set_meta_split_status(meta_split_status);
         }
@@ -532,9 +529,7 @@ public:
 
     const host_port PRIMARY = host_port("localhost", 18230);
     const host_port SECONDARY = host_port("localhost", 10058);
-    const rpc_address SECONDARY_ADDR = 
dsn::dns_resolver::instance().resolve_address(SECONDARY);
     const host_port SECONDARY2 = host_port("localhost", 10805);
-    const rpc_address SECONDARY_ADDR2 = 
dsn::dns_resolver::instance().resolve_address(SECONDARY2);
     const gpid PARENT_GPID = gpid(APP_ID, 1);
     const gpid CHILD_GPID = gpid(APP_ID, 9);
     const ballot INIT_BALLOT = 3;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to