This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new ef32de4e9 refactor(FQDN): Update src/failure_detector/fd.thrift
related code (#1990)
ef32de4e9 is described below
commit ef32de4e9bcbbb25aa496865d9c65f283ef98c6e
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 29 11:37:14 2024 +0800
refactor(FQDN): Update src/failure_detector/fd.thrift related code (#1990)
---
src/failure_detector/failure_detector.cpp | 47 ++++++-----------
.../failure_detector_multimaster.cpp | 19 +++----
src/failure_detector/test/failure_detector.cpp | 23 +++-----
src/meta/meta_server_failure_detector.cpp | 61 ++++++++--------------
4 files changed, 55 insertions(+), 95 deletions(-)
diff --git a/src/failure_detector/failure_detector.cpp
b/src/failure_detector/failure_detector.cpp
index c4c6f087c..0cda09d5f 100644
--- a/src/failure_detector/failure_detector.cpp
+++ b/src/failure_detector/failure_detector.cpp
@@ -356,15 +356,12 @@ std::string failure_detector::get_allow_list(const
std::vector<std::string> &arg
void failure_detector::on_ping_internal(const beacon_msg &beacon, /*out*/
beacon_ack &ack)
{
- host_port hp_from_node, hp_to_node;
+ host_port hp_from_node;
GET_HOST_PORT(beacon, from_node, hp_from_node);
- GET_HOST_PORT(beacon, to_node, hp_to_node);
ack.time = beacon.time;
- ack.this_node = beacon.to_node;
- ack.__set_hp_this_node(hp_to_node);
- ack.primary_node = dsn_primary_address();
- ack.__set_hp_primary_node(dsn_primary_host_port());
+ SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon, to_node);
+ SET_IP_AND_HOST_PORT(ack, primary_node, dsn_primary_address(),
dsn_primary_host_port());
ack.is_master = true;
ack.allowed = true;
@@ -442,9 +439,8 @@ bool failure_detector::end_ping_internal(::dsn::error_code
err, const beacon_ack
if (itr == _masters.end()) {
LOG_WARNING("received beacon ack without corresponding master, ignore
it, "
- "remote_master[{}({})], local_worker[{}({})]",
- hp_this_node,
- ack.this_node,
+ "remote_master[{}], local_worker[{}({})]",
+ FMT_HOST_PORT_AND_IP(ack, this_node),
dsn_primary_host_port(),
dsn_primary_address());
return false;
@@ -452,10 +448,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code
err, const beacon_ack
master_record &record = itr->second;
if (!ack.allowed) {
- LOG_WARNING("worker rejected, stop sending beacon message,
remote_master[{}({})], "
+ LOG_WARNING("worker rejected, stop sending beacon message,
remote_master[{}], "
"local_worker[{}({})]",
- hp_this_node,
- ack.this_node,
+ FMT_HOST_PORT_AND_IP(ack, this_node),
dsn_primary_host_port(),
dsn_primary_address());
record.rejected = true;
@@ -478,11 +473,9 @@ bool failure_detector::end_ping_internal(::dsn::error_code
err, const beacon_ack
// if ack is not from master meta, worker should not update its last send
time
if (!ack.is_master) {
- LOG_WARNING("node[{}({})] is not master, ack.primary_node[{}({})] is
real master",
- hp_this_node,
- ack.this_node,
- hp_primary_node,
- ack.primary_node);
+ LOG_WARNING("node[{}] is not master, ack.primary_node[{}] is real
master",
+ FMT_HOST_PORT_AND_IP(ack, this_node),
+ FMT_HOST_PORT_AND_IP(ack, primary_node));
return true;
}
@@ -590,17 +583,13 @@ void failure_detector::send_beacon(const host_port
&target, uint64_t time)
const auto &addr_target =
dsn::dns_resolver::instance().resolve_address(target);
beacon_msg beacon;
beacon.time = time;
- beacon.from_node = dsn_primary_address();
- beacon.__set_hp_from_node(dsn_primary_host_port());
- beacon.to_node = addr_target;
- beacon.__set_hp_to_node(target);
+ SET_IP_AND_HOST_PORT(beacon, from_node, dsn_primary_address(),
dsn_primary_host_port());
+ SET_IP_AND_HOST_PORT(beacon, to_node, addr_target, target);
beacon.__set_start_time(static_cast<int64_t>(dsn::utils::process_start_millis()));
- LOG_INFO("send ping message, from[{}({})], to[{}({})], time[{}]",
- beacon.hp_from_node,
- beacon.from_node,
- beacon.hp_to_node,
- beacon.to_node,
+ LOG_INFO("send ping message, from[{}], to[{}], time[{}]",
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
+ FMT_HOST_PORT_AND_IP(beacon, to_node),
time);
::dsn::rpc::call(addr_target,
@@ -611,10 +600,8 @@ void failure_detector::send_beacon(const host_port
&target, uint64_t time)
if (err != ::dsn::ERR_OK) {
beacon_ack ack;
ack.time = beacon.time;
- ack.this_node = beacon.to_node;
- ack.__set_hp_this_node(beacon.hp_to_node);
- ack.primary_node.set_invalid();
- ack.__set_hp_primary_node(host_port());
+ SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon,
to_node);
+ RESET_IP_AND_HOST_PORT(ack, primary_node);
ack.is_master = false;
ack.allowed = true;
end_ping(err, ack, nullptr);
diff --git a/src/failure_detector/failure_detector_multimaster.cpp
b/src/failure_detector/failure_detector_multimaster.cpp
index 011f5acf9..e12fc69d1 100644
--- a/src/failure_detector/failure_detector_multimaster.cpp
+++ b/src/failure_detector/failure_detector_multimaster.cpp
@@ -73,17 +73,14 @@ void
slave_failure_detector_with_multimaster::end_ping(::dsn::error_code err,
GET_HOST_PORT(ack, this_node, hp_this_node);
GET_HOST_PORT(ack, primary_node, hp_primary_node);
- LOG_INFO(
- "end ping result, error[{}], time[{}], ack.this_node[{}({})],
ack.primary_node[{}({})], "
- "ack.is_master[{}], ack.allowed[{}]",
- err,
- ack.time,
- hp_this_node,
- ack.this_node,
- hp_primary_node,
- ack.primary_node,
- ack.is_master ? "true" : "false",
- ack.allowed ? "true" : "false");
+ LOG_INFO("end ping result, error[{}], time[{}], ack.this_node[{}],
ack.primary_node[{}], "
+ "ack.is_master[{}], ack.allowed[{}]",
+ err,
+ ack.time,
+ FMT_HOST_PORT_AND_IP(ack, this_node),
+ FMT_HOST_PORT_AND_IP(ack, primary_node),
+ ack.is_master ? "true" : "false",
+ ack.allowed ? "true" : "false");
zauto_lock l(failure_detector::_lock);
if (!failure_detector::end_ping_internal(err, ack))
diff --git a/src/failure_detector/test/failure_detector.cpp
b/src/failure_detector/test/failure_detector.cpp
index dcfcd69ae..1e14ebece 100644
--- a/src/failure_detector/test/failure_detector.cpp
+++ b/src/failure_detector/test/failure_detector.cpp
@@ -48,7 +48,6 @@
#include "meta/meta_server_failure_detector.h"
#include "replica/replica_stub.h"
#include "runtime/api_layer1.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/network.h"
#include "runtime/rpc/rpc_address.h"
@@ -145,8 +144,8 @@ public:
else {
LOG_DEBUG("ignore on ping, beacon msg, time[{}], from[{}], to[{}]",
beacon.time,
- beacon.from_node,
- beacon.to_node);
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
+ FMT_HOST_PORT_AND_IP(beacon, to_node));
}
}
@@ -206,7 +205,7 @@ public:
void on_master_config(const config_master_message &request, bool &response)
{
LOG_DEBUG("master config, request: {}, type: {}",
- request.master,
+ FMT_HOST_PORT_AND_IP(request, master),
request.is_register ? "reg" : "unreg");
host_port hp_master;
@@ -324,9 +323,8 @@ void worker_set_leader(test_worker *worker, int
leader_contact)
worker->fd()->set_leader_for_test(host_port("localhost", MPORT_START +
leader_contact));
config_master_message msg;
- msg.master = rpc_address::from_host_port("localhost", MPORT_START +
leader_contact);
+ SET_IP_AND_HOST_PORT_BY_DNS(msg, master, host_port("localhost",
MPORT_START + leader_contact));
msg.is_register = true;
- msg.__set_hp_master(host_port::from_address(msg.master));
error_code err;
bool response;
std::tie(err, response) = rpc::call_wait<bool>(
@@ -336,13 +334,10 @@ void worker_set_leader(test_worker *worker, int
leader_contact)
void clear(test_worker *worker, std::vector<test_master *> masters)
{
- const auto &hp_leader =
worker->fd()->get_servers().group_host_port()->leader();
- const auto &leader =
dsn::dns_resolver::instance().resolve_address(hp_leader);
-
config_master_message msg;
- msg.master = leader;
+ SET_IP_AND_HOST_PORT_BY_DNS(
+ msg, master, worker->fd()->get_servers().group_host_port()->leader());
msg.is_register = false;
- msg.__set_hp_master(hp_leader);
error_code err;
bool response;
std::tie(err, response) = rpc::call_wait<bool>(
@@ -658,10 +653,8 @@ TEST(fd, update_stability)
dsn::rpc_replier<beacon_ack> r(create_fake_rpc_response());
beacon_msg msg;
- msg.from_node = rpc_address::from_host_port("localhost", 123);
- msg.__set_hp_from_node(host_port("localhost", 123));
- msg.to_node = rpc_address::from_host_port("localhost", MPORT_START);
- msg.__set_hp_to_node(host_port("localhost", MPORT_START));
+ SET_IP_AND_HOST_PORT_BY_DNS(msg, from_node, host_port("localhost", 123));
+ SET_IP_AND_HOST_PORT_BY_DNS(msg, to_node, host_port("localhost",
MPORT_START));
msg.time = dsn_now_ms();
msg.__isset.start_time = true;
msg.start_time = 1000;
diff --git a/src/meta/meta_server_failure_detector.cpp
b/src/meta/meta_server_failure_detector.cpp
index e729bd87f..56cc04e46 100644
--- a/src/meta/meta_server_failure_detector.cpp
+++ b/src/meta/meta_server_failure_detector.cpp
@@ -35,7 +35,6 @@
#include "meta/meta_options.h"
#include "meta/meta_service.h"
#include "runtime/app_model.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/serverlet.h"
#include "runtime/task/task_code.h"
@@ -247,36 +246,31 @@ bool
meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
} else {
worker_stability &w = iter->second;
if (beacon.start_time == w.last_start_time_ms) {
- LOG_DEBUG("{}({}) isn't restarted, last_start_time({})",
- hp_from_node,
- beacon.from_node,
+ LOG_DEBUG("{} isn't restarted, last_start_time({})",
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
w.last_start_time_ms);
if (dsn_now_ms() - w.last_start_time_ms >=
FLAGS_stable_rs_min_running_seconds * 1000 &&
w.unstable_restart_count > 0) {
- LOG_INFO("{}({}) has stably run for a while, reset it's
unstable count({}) to 0",
- hp_from_node,
- beacon.from_node,
+ LOG_INFO("{} has stably run for a while, reset it's unstable
count({}) to 0",
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
w.unstable_restart_count);
w.unstable_restart_count = 0;
}
} else if (beacon.start_time > w.last_start_time_ms) {
- LOG_INFO("check {}({}) restarted, last_time({}), this_time({})",
- hp_from_node,
- beacon.from_node,
+ LOG_INFO("check {} restarted, last_time({}), this_time({})",
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
w.last_start_time_ms,
beacon.start_time);
if (beacon.start_time - w.last_start_time_ms <
FLAGS_stable_rs_min_running_seconds * 1000) {
w.unstable_restart_count++;
- LOG_WARNING("{}({}) encounter an unstable restart,
total_count({})",
- hp_from_node,
- beacon.from_node,
+ LOG_WARNING("{} encounter an unstable restart,
total_count({})",
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
w.unstable_restart_count);
} else if (w.unstable_restart_count > 0) {
- LOG_INFO("{}({}) restart in {} ms after last restart, may
recover ok, reset "
+ LOG_INFO("{} restart in {} ms after last restart, may recover
ok, reset "
"it's unstable count({}) to 0",
- hp_from_node,
- beacon.from_node,
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
beacon.start_time - w.last_start_time_ms,
w.unstable_restart_count);
w.unstable_restart_count = 0;
@@ -284,9 +278,8 @@ bool
meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
w.last_start_time_ms = beacon.start_time;
} else {
- LOG_WARNING("{}({}): possible encounter a staled message, ignore
it",
- hp_from_node,
- beacon.from_node);
+ LOG_WARNING("{}: possible encounter a staled message, ignore it",
+ FMT_HOST_PORT_AND_IP(beacon, from_node));
}
return w.unstable_restart_count < FLAGS_max_succssive_unstable_restart;
}
@@ -295,44 +288,34 @@ bool
meta_server_failure_detector::update_stability_stat(const fd::beacon_msg &b
void meta_server_failure_detector::on_ping(const fd::beacon_msg &beacon,
rpc_replier<fd::beacon_ack> &reply)
{
- host_port hp_from_node, hp_to_node;
- GET_HOST_PORT(beacon, from_node, hp_from_node);
- GET_HOST_PORT(beacon, to_node, hp_to_node);
-
if (beacon.__isset.start_time && !update_stability_stat(beacon)) {
- LOG_WARNING(
- "{}({}) is unstable, don't response to it's beacon",
beacon.from_node, hp_from_node);
+ LOG_WARNING("{} is unstable, don't response to it's beacon",
+ FMT_HOST_PORT_AND_IP(beacon, from_node));
return;
}
fd::beacon_ack ack;
ack.time = beacon.time;
- ack.this_node = beacon.to_node;
+ SET_OBJ_IP_AND_HOST_PORT(ack, this_node, beacon, to_node);
ack.allowed = true;
- ack.__set_hp_this_node(hp_to_node);
dsn::host_port leader;
if (!get_leader(&leader)) {
ack.is_master = false;
- ack.primary_node =
dsn::dns_resolver::instance().resolve_address(leader);
- ack.__set_hp_primary_node(leader);
+ SET_IP_AND_HOST_PORT_BY_DNS(ack, primary_node, leader);
} else {
ack.is_master = true;
- ack.primary_node = beacon.to_node;
- ack.__set_hp_primary_node(hp_to_node);
+ SET_OBJ_IP_AND_HOST_PORT(ack, primary_node, beacon, to_node);
failure_detector::on_ping_internal(beacon, ack);
}
- LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}({})),
this_node({}({})), "
- "primary_node({}({}))",
+ LOG_INFO("on_ping, beacon send time[{}], is_master({}), from_node({}),
this_node({}), "
+ "primary_node({})",
ack.time,
ack.is_master ? "true" : "false",
- hp_from_node,
- beacon.from_node,
- hp_to_node,
- beacon.to_node,
- ack.hp_primary_node,
- ack.primary_node);
+ FMT_HOST_PORT_AND_IP(beacon, from_node),
+ FMT_HOST_PORT_AND_IP(beacon, to_node),
+ FMT_HOST_PORT_AND_IP(ack, primary_node));
reply(ack);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]