This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new ef32de4e9 refactor(FQDN): Update src/failure_detector/fd.thrift 
related code (#1990)
ef32de4e9 is described below

commit ef32de4e9bcbbb25aa496865d9c65f283ef98c6e
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 29 11:37:14 2024 +0800

    refactor(FQDN): Update src/failure_detector/fd.thrift related code (#1990)
---
 src/failure_detector/failure_detector.cpp          | 47 ++++++-----------
 .../failure_detector_multimaster.cpp               | 19 +++----
 src/failure_detector/test/failure_detector.cpp     | 23 +++-----
 src/meta/meta_server_failure_detector.cpp          | 61 ++++++++--------------
 4 files changed, 55 insertions(+), 95 deletions(-)

diff --git a/src/failure_detector/failure_detector.cpp 
b/src/failure_detector/failure_detector.cpp
index c4c6f087c..0cda09d5f 100644
--- a/src/failure_detector/failure_detector.cpp
+++ b/src/failure_detector/failure_detector.cpp
@@ -356,15 +356,12 @@ std::string failure_detector::get_allow_list(const 
std::vector<std::string> &arg
 
 void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/ 
beacon_ack &ack)
 {
-    host_port hp_from_node, hp_to_node;
+    host_port hp_from_node;
     GET_HOST_PORT(beacon, from_node, hp_from_node);
-    GET_HOST_PORT(beacon, to_node, hp_to_node);
 
     ack.time = beacon.time;
-    ack.this_node = beacon.to_node;
-    ack.__set_hp_this_node(hp_to_node);
-    ack.primary_node = dsn_primary_address();
-    ack.__set_hp_primary_node(dsn_primary_host_port());
+    SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon, to_node);
+    SET_IP_AND_HOST_PORT(ack, primary_node, dsn_primary_address(), 
dsn_primary_host_port());
     ack.is_master = true;
     ack.allowed = true;
 
@@ -442,9 +439,8 @@ bool failure_detector::end_ping_internal(::dsn::error_code 
err, const beacon_ack
 
     if (itr == _masters.end()) {
         LOG_WARNING("received beacon ack without corresponding master, ignore 
it, "
-                    "remote_master[{}({})], local_worker[{}({})]",
-                    hp_this_node,
-                    ack.this_node,
+                    "remote_master[{}], local_worker[{}({})]",
+                    FMT_HOST_PORT_AND_IP(ack, this_node),
                     dsn_primary_host_port(),
                     dsn_primary_address());
         return false;
@@ -452,10 +448,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code 
err, const beacon_ack
 
     master_record &record = itr->second;
     if (!ack.allowed) {
-        LOG_WARNING("worker rejected, stop sending beacon message, 
remote_master[{}({})], "
+        LOG_WARNING("worker rejected, stop sending beacon message, 
remote_master[{}], "
                     "local_worker[{}({})]",
-                    hp_this_node,
-                    ack.this_node,
+                    FMT_HOST_PORT_AND_IP(ack, this_node),
                     dsn_primary_host_port(),
                     dsn_primary_address());
         record.rejected = true;
@@ -478,11 +473,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code 
err, const beacon_ack
 
     // if ack is not from master meta, worker should not update its last send 
time
     if (!ack.is_master) {
-        LOG_WARNING("node[{}({})] is not master, ack.primary_node[{}({})] is 
real master",
-                    hp_this_node,
-                    ack.this_node,
-                    hp_primary_node,
-                    ack.primary_node);
+        LOG_WARNING("node[{}] is not master, ack.primary_node[{}] is real 
master",
+                    FMT_HOST_PORT_AND_IP(ack, this_node),
+                    FMT_HOST_PORT_AND_IP(ack, primary_node));
         return true;
     }
 
@@ -590,17 +583,13 @@ void failure_detector::send_beacon(const host_port 
&target, uint64_t time)
     const auto &addr_target = 
dsn::dns_resolver::instance().resolve_address(target);
     beacon_msg beacon;
     beacon.time = time;
-    beacon.from_node = dsn_primary_address();
-    beacon.__set_hp_from_node(dsn_primary_host_port());
-    beacon.to_node = addr_target;
-    beacon.__set_hp_to_node(target);
+    SET_IP_AND_HOST_PORT(beacon, from_node, dsn_primary_address(), 
dsn_primary_host_port());
+    SET_IP_AND_HOST_PORT(beacon, to_node, addr_target, target);
     
beacon.__set_start_time(static_cast<int64_t>(dsn::utils::process_start_millis()));
 
-    LOG_INFO("send ping message, from[{}({})], to[{}({})], time[{}]",
-             beacon.hp_from_node,
-             beacon.from_node,
-             beacon.hp_to_node,
-             beacon.to_node,
+    LOG_INFO("send ping message, from[{}], to[{}], time[{}]",
+             FMT_HOST_PORT_AND_IP(beacon, from_node),
+             FMT_HOST_PORT_AND_IP(beacon, to_node),
              time);
 
     ::dsn::rpc::call(addr_target,
@@ -611,10 +600,8 @@ void failure_detector::send_beacon(const host_port 
&target, uint64_t time)
                          if (err != ::dsn::ERR_OK) {
                              beacon_ack ack;
                              ack.time = beacon.time;
-                             ack.this_node = beacon.to_node;
-                             ack.__set_hp_this_node(beacon.hp_to_node);
-                             ack.primary_node.set_invalid();
-                             ack.__set_hp_primary_node(host_port());
+                             SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon, 
to_node);
+                             RESET_IP_AND_HOST_PORT(ack, primary_node);
                              ack.is_master = false;
                              ack.allowed = true;
                              end_ping(err, ack, nullptr);
diff --git a/src/failure_detector/failure_detector_multimaster.cpp 
b/src/failure_detector/failure_detector_multimaster.cpp
index 011f5acf9..e12fc69d1 100644
--- a/src/failure_detector/failure_detector_multimaster.cpp
+++ b/src/failure_detector/failure_detector_multimaster.cpp
@@ -73,17 +73,14 @@ void 
slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err,
     GET_HOST_PORT(ack, this_node, hp_this_node);
     GET_HOST_PORT(ack, primary_node, hp_primary_node);
 
-    LOG_INFO(
-        "end ping result, error[{}], time[{}], ack.this_node[{}({})], 
ack.primary_node[{}({})], "
-        "ack.is_master[{}], ack.allowed[{}]",
-        err,
-        ack.time,
-        hp_this_node,
-        ack.this_node,
-        hp_primary_node,
-        ack.primary_node,
-        ack.is_master ? "true" : "false",
-        ack.allowed ? "true" : "false");
+    LOG_INFO("end ping result, error[{}], time[{}], ack.this_node[{}], 
ack.primary_node[{}], "
+             "ack.is_master[{}], ack.allowed[{}]",
+             err,
+             ack.time,
+             FMT_HOST_PORT_AND_IP(ack, this_node),
+             FMT_HOST_PORT_AND_IP(ack, primary_node),
+             ack.is_master ? "true" : "false",
+             ack.allowed ? "true" : "false");
 
     zauto_lock l(failure_detector::_lock);
     if (!failure_detector::end_ping_internal(err, ack))
diff --git a/src/failure_detector/test/failure_detector.cpp 
b/src/failure_detector/test/failure_detector.cpp
index dcfcd69ae..1e14ebece 100644
--- a/src/failure_detector/test/failure_detector.cpp
+++ b/src/failure_detector/test/failure_detector.cpp
@@ -48,7 +48,6 @@
 #include "meta/meta_server_failure_detector.h"
 #include "replica/replica_stub.h"
 #include "runtime/api_layer1.h"
-#include "runtime/rpc/dns_resolver.h"
 #include "runtime/rpc/group_host_port.h"
 #include "runtime/rpc/network.h"
 #include "runtime/rpc/rpc_address.h"
@@ -145,8 +144,8 @@ public:
         else {
             LOG_DEBUG("ignore on ping, beacon msg, time[{}], from[{}], to[{}]",
                       beacon.time,
-                      beacon.from_node,
-                      beacon.to_node);
+                      FMT_HOST_PORT_AND_IP(beacon, from_node),
+                      FMT_HOST_PORT_AND_IP(beacon, to_node));
         }
     }
 
@@ -206,7 +205,7 @@ public:
     void on_master_config(const config_master_message &request, bool &response)
     {
         LOG_DEBUG("master config, request: {}, type: {}",
-                  request.master,
+                  FMT_HOST_PORT_AND_IP(request, master),
                   request.is_register ? "reg" : "unreg");
 
         host_port hp_master;
@@ -324,9 +323,8 @@ void worker_set_leader(test_worker *worker, int 
leader_contact)
     worker->fd()->set_leader_for_test(host_port("localhost", MPORT_START + 
leader_contact));
 
     config_master_message msg;
-    msg.master = rpc_address::from_host_port("localhost", MPORT_START + 
leader_contact);
+    SET_IP_AND_HOST_PORT_BY_DNS(msg, master, host_port("localhost", 
MPORT_START + leader_contact));
     msg.is_register = true;
-    msg.__set_hp_master(host_port::from_address(msg.master));
     error_code err;
     bool response;
     std::tie(err, response) = rpc::call_wait<bool>(
@@ -336,13 +334,10 @@ void worker_set_leader(test_worker *worker, int 
leader_contact)
 
 void clear(test_worker *worker, std::vector<test_master *> masters)
 {
-    const auto &hp_leader = 
worker->fd()->get_servers().group_host_port()->leader();
-    const auto &leader = 
dsn::dns_resolver::instance().resolve_address(hp_leader);
-
     config_master_message msg;
-    msg.master = leader;
+    SET_IP_AND_HOST_PORT_BY_DNS(
+        msg, master, worker->fd()->get_servers().group_host_port()->leader());
     msg.is_register = false;
-    msg.__set_hp_master(hp_leader);
     error_code err;
     bool response;
     std::tie(err, response) = rpc::call_wait<bool>(
@@ -658,10 +653,8 @@ TEST(fd, update_stability)
 
     dsn::rpc_replier<beacon_ack> r(create_fake_rpc_response());
     beacon_msg msg;
-    msg.from_node = rpc_address::from_host_port("localhost", 123);
-    msg.__set_hp_from_node(host_port("localhost", 123));
-    msg.to_node = rpc_address::from_host_port("localhost", MPORT_START);
-    msg.__set_hp_to_node(host_port("localhost", MPORT_START));
+    SET_IP_AND_HOST_PORT_BY_DNS(msg, from_node, host_port("localhost", 123));
+    SET_IP_AND_HOST_PORT_BY_DNS(msg, to_node, host_port("localhost", 
MPORT_START));
     msg.time = dsn_now_ms();
     msg.__isset.start_time = true;
     msg.start_time = 1000;
diff --git a/src/meta/meta_server_failure_detector.cpp 
b/src/meta/meta_server_failure_detector.cpp
index e729bd87f..56cc04e46 100644
--- a/src/meta/meta_server_failure_detector.cpp
+++ b/src/meta/meta_server_failure_detector.cpp
@@ -35,7 +35,6 @@
 #include "meta/meta_options.h"
 #include "meta/meta_service.h"
 #include "runtime/app_model.h"
-#include "runtime/rpc/dns_resolver.h"
 #include "runtime/rpc/rpc_address.h"
 #include "runtime/serverlet.h"
 #include "runtime/task/task_code.h"
@@ -247,36 +246,31 @@ bool 
meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
     } else {
         worker_stability &w = iter->second;
         if (beacon.start_time == w.last_start_time_ms) {
-            LOG_DEBUG("{}({}) isn't restarted, last_start_time({})",
-                      hp_from_node,
-                      beacon.from_node,
+            LOG_DEBUG("{} isn't restarted, last_start_time({})",
+                      FMT_HOST_PORT_AND_IP(beacon, from_node),
                       w.last_start_time_ms);
             if (dsn_now_ms() - w.last_start_time_ms >= 
FLAGS_stable_rs_min_running_seconds * 1000 &&
                 w.unstable_restart_count > 0) {
-                LOG_INFO("{}({}) has stably run for a while, reset it's 
unstable count({}) to 0",
-                         hp_from_node,
-                         beacon.from_node,
+                LOG_INFO("{} has stably run for a while, reset it's unstable 
count({}) to 0",
+                         FMT_HOST_PORT_AND_IP(beacon, from_node),
                          w.unstable_restart_count);
                 w.unstable_restart_count = 0;
             }
         } else if (beacon.start_time > w.last_start_time_ms) {
-            LOG_INFO("check {}({}) restarted, last_time({}), this_time({})",
-                     hp_from_node,
-                     beacon.from_node,
+            LOG_INFO("check {} restarted, last_time({}), this_time({})",
+                     FMT_HOST_PORT_AND_IP(beacon, from_node),
                      w.last_start_time_ms,
                      beacon.start_time);
             if (beacon.start_time - w.last_start_time_ms <
                 FLAGS_stable_rs_min_running_seconds * 1000) {
                 w.unstable_restart_count++;
-                LOG_WARNING("{}({}) encounter an unstable restart, 
total_count({})",
-                            hp_from_node,
-                            beacon.from_node,
+                LOG_WARNING("{} encounter an unstable restart, 
total_count({})",
+                            FMT_HOST_PORT_AND_IP(beacon, from_node),
                             w.unstable_restart_count);
             } else if (w.unstable_restart_count > 0) {
-                LOG_INFO("{}({}) restart in {} ms after last restart, may 
recover ok, reset "
+                LOG_INFO("{} restart in {} ms after last restart, may recover 
ok, reset "
                          "it's unstable count({}) to 0",
-                         hp_from_node,
-                         beacon.from_node,
+                         FMT_HOST_PORT_AND_IP(beacon, from_node),
                          beacon.start_time - w.last_start_time_ms,
                          w.unstable_restart_count);
                 w.unstable_restart_count = 0;
@@ -284,9 +278,8 @@ bool 
meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
 
             w.last_start_time_ms = beacon.start_time;
         } else {
-            LOG_WARNING("{}({}): possible encounter a staled message, ignore 
it",
-                        hp_from_node,
-                        beacon.from_node);
+            LOG_WARNING("{}: possible encounter a staled message, ignore it",
+                        FMT_HOST_PORT_AND_IP(beacon, from_node));
         }
         return w.unstable_restart_count < FLAGS_max_succssive_unstable_restart;
     }
@@ -295,44 +288,34 @@ bool 
meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
 void meta_server_failure_detector::on_ping(const fd::beacon_msg &beacon,
                                            rpc_replier<fd::beacon_ack> &reply)
 {
-    host_port hp_from_node, hp_to_node;
-    GET_HOST_PORT(beacon, from_node, hp_from_node);
-    GET_HOST_PORT(beacon, to_node, hp_to_node);
-
     if (beacon.__isset.start_time && !update_stability_stat(beacon)) {
-        LOG_WARNING(
-            "{}({}) is unstable, don't response to it's beacon", 
beacon.from_node, hp_from_node);
+        LOG_WARNING("{} is unstable, don't response to it's beacon",
+                    FMT_HOST_PORT_AND_IP(beacon, from_node));
         return;
     }
 
     fd::beacon_ack ack;
     ack.time = beacon.time;
-    ack.this_node = beacon.to_node;
+    SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon, to_node);
     ack.allowed = true;
-    ack.__set_hp_this_node(hp_to_node);
 
     dsn::host_port leader;
     if (!get_leader(&leader)) {
         ack.is_master = false;
-        ack.primary_node = 
dsn::dns_resolver::instance().resolve_address(leader);
-        ack.__set_hp_primary_node(leader);
+        SET_IP_AND_HOST_PORT_BY_DNS(ack, primary_node, leader);
     } else {
         ack.is_master = true;
-        ack.primary_node = beacon.to_node;
-        ack.__set_hp_primary_node(hp_to_node);
+        SET_OBJ_IP_AND_HOST_PORT(ack, primary_node, beacon, to_node);
         failure_detector::on_ping_internal(beacon, ack);
     }
 
-    LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}({})), 
this_node({}({})), "
-             "primary_node({}({}))",
+    LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}), 
this_node({}), "
+             "primary_node({})",
              ack.time,
              ack.is_master ? "true" : "false",
-             hp_from_node,
-             beacon.from_node,
-             hp_to_node,
-             beacon.to_node,
-             ack.hp_primary_node,
-             ack.primary_node);
+             FMT_HOST_PORT_AND_IP(beacon, from_node),
+             FMT_HOST_PORT_AND_IP(beacon, to_node),
+             FMT_HOST_PORT_AND_IP(ack, primary_node));
 
     reply(ack);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to