This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 6d375e72c refactor(FQDN): Add some convenient macros and update
idl/meta_admin.thrift related code (#1984)
6d375e72c is described below
commit 6d375e72c09ff661104847e020b2a82dd66a2407
Author: Yingchun Lai <[email protected]>
AuthorDate: Thu Apr 25 18:12:18 2024 +0800
refactor(FQDN): Add some convenient macros and update idl/meta_admin.thrift
related code (#1984)
Add new convenient macros GET_HOST_PORTS and SET_OBJ_IP_AND_HOST_PORT
for convenient using in tests.
---
idl/meta_admin.thrift | 8 +-
src/client/replication_ddl_client.cpp | 18 +-
src/common/replication_other_types.h | 12 ++
.../failure_detector_multimaster.cpp | 1 +
src/meta/load_balance_policy.cpp | 11 +-
src/meta/meta_data.cpp | 9 +-
src/meta/meta_service.cpp | 6 +-
src/meta/meta_split_service.cpp | 4 +-
src/meta/partition_guardian.cpp | 170 +++++++++---------
src/meta/server_load_balancer.cpp | 18 +-
src/meta/server_state.cpp | 190 ++++++++++-----------
src/meta/test/balancer_validator.cpp | 43 +++--
src/meta/test/meta_partition_guardian_test.cpp | 28 ++-
src/meta/test/meta_split_service_test.cpp | 5 +-
src/meta/test/misc/misc.cpp | 82 +++++----
src/meta/test/update_configuration_test.cpp | 11 +-
src/replica/replica.h | 2 +-
src/replica/replica_config.cpp | 102 ++++++-----
src/replica/replica_failover.cpp | 6 +-
src/replica/replica_stub.cpp | 15 +-
src/replica/storage/simple_kv/test/checker.cpp | 28 ++-
src/replica/storage/simple_kv/test/client.cpp | 6 +-
src/replica/test/open_replica_test.cpp | 6 +-
src/runtime/rpc/rpc_host_port.cpp | 9 -
src/runtime/rpc/rpc_host_port.h | 94 +++++++---
src/runtime/test/host_port_test.cpp | 29 ++++
src/shell/commands/recovery.cpp | 12 +-
27 files changed, 519 insertions(+), 406 deletions(-)
diff --git a/idl/meta_admin.thrift b/idl/meta_admin.thrift
index f5ad79b40..eec65717e 100644
--- a/idl/meta_admin.thrift
+++ b/idl/meta_admin.thrift
@@ -116,10 +116,10 @@ struct configuration_query_by_node_response
struct configuration_recovery_request
{
- 1:list<dsn.rpc_address> recovery_set;
+ 1:list<dsn.rpc_address> recovery_nodes;
2:bool skip_bad_nodes;
3:bool skip_lost_partitions;
- 4:optional list<dsn.host_port> hp_recovery_set;
+ 4:optional list<dsn.host_port> hp_recovery_nodes;
}
struct configuration_recovery_response
@@ -283,8 +283,8 @@ struct query_app_manual_compact_response
struct node_info
{
1:node_status status = node_status.NS_INVALID;
- 2:dsn.rpc_address address;
- 3:optional dsn.host_port hp_address;
+ 2:dsn.rpc_address node;
+ 3:optional dsn.host_port hp_node;
}
struct configuration_list_nodes_request
diff --git a/src/client/replication_ddl_client.cpp
b/src/client/replication_ddl_client.cpp
index 3e01d745f..5932b813d 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -505,7 +505,7 @@ dsn::error_code replication_ddl_client::list_nodes(
for (const auto &n : resp.infos) {
host_port hp;
- GET_HOST_PORT(n, address, hp);
+ GET_HOST_PORT(n, node, hp);
nodes[hp] = n.status;
}
@@ -912,20 +912,22 @@ dsn::error_code replication_ddl_client::do_recovery(const
std::vector<host_port>
std::ostream out(buf);
auto req = std::make_shared<configuration_recovery_request>();
- req->recovery_set.clear();
- req->__set_hp_recovery_set(std::vector<host_port>());
+ CLEAR_IP_AND_HOST_PORT(*req, recovery_nodes);
for (const auto &node : replica_nodes) {
- if (utils::contains(req->hp_recovery_set, node)) {
+ if (utils::contains(req->hp_recovery_nodes, node)) {
out << "duplicate replica node " << node << ", just ingore it" <<
std::endl;
} else {
- req->hp_recovery_set.push_back(node);
-
req->recovery_set.push_back(dsn::dns_resolver::instance().resolve_address(node));
+ ADD_IP_AND_HOST_PORT_BY_DNS(*req, recovery_nodes, node);
}
}
- if (req->hp_recovery_set.empty()) {
+ if (req->hp_recovery_nodes.empty()) {
+ CHECK(req->recovery_nodes.empty(),
+ "recovery_nodes should be set together with hp_recovery_nodes");
out << "node set for recovery it empty" << std::endl;
return ERR_INVALID_PARAMETERS;
}
+ CHECK(!req->recovery_nodes.empty(),
+ "recovery_nodes should be set together with hp_recovery_nodes");
req->skip_bad_nodes = skip_bad_nodes;
req->skip_lost_partitions = skip_lost_partitions;
@@ -934,7 +936,7 @@ dsn::error_code replication_ddl_client::do_recovery(const
std::vector<host_port>
out << "Skip lost partitions: " << (skip_lost_partitions ? "true" :
"false") << std::endl;
out << "Node list:" << std::endl;
out << "=============================" << std::endl;
- for (auto &node : req->hp_recovery_set) {
+ for (auto &node : req->hp_recovery_nodes) {
out << node << std::endl;
}
out << "=============================" << std::endl;
diff --git a/src/common/replication_other_types.h
b/src/common/replication_other_types.h
index 457aa4010..7f8c51dcd 100644
--- a/src/common/replication_other_types.h
+++ b/src/common/replication_other_types.h
@@ -55,14 +55,26 @@ inline bool is_primary(const partition_configuration &pc,
const host_port &node)
{
return node && pc.hp_primary == node;
}
+inline bool is_primary(const partition_configuration &pc, const rpc_address
&node)
+{
+ return node && pc.primary == node;
+}
inline bool is_secondary(const partition_configuration &pc, const host_port
&node)
{
return node && utils::contains(pc.hp_secondaries, node);
}
+inline bool is_secondary(const partition_configuration &pc, const rpc_address
&node)
+{
+ return node && utils::contains(pc.secondaries, node);
+}
inline bool is_member(const partition_configuration &pc, const host_port &node)
{
return is_primary(pc, node) || is_secondary(pc, node);
}
+inline bool is_member(const partition_configuration &pc, const rpc_address
&node)
+{
+ return is_primary(pc, node) || is_secondary(pc, node);
+}
inline bool is_partition_config_equal(const partition_configuration &pc1,
const partition_configuration &pc2)
{
diff --git a/src/failure_detector/failure_detector_multimaster.cpp
b/src/failure_detector/failure_detector_multimaster.cpp
index 8f56eb603..011f5acf9 100644
--- a/src/failure_detector/failure_detector_multimaster.cpp
+++ b/src/failure_detector/failure_detector_multimaster.cpp
@@ -29,6 +29,7 @@
#include "failure_detector/failure_detector_multimaster.h"
#include "fd_types.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
#include "runtime/rpc/rpc_host_port.h"
#include "utils/error_code.h"
#include "utils/rand.h"
diff --git a/src/meta/load_balance_policy.cpp b/src/meta/load_balance_policy.cpp
index 0dab076c5..7b0bd19d8 100644
--- a/src/meta/load_balance_policy.cpp
+++ b/src/meta/load_balance_policy.cpp
@@ -33,7 +33,8 @@
#include "dsn.layer2_types.h"
#include "meta/meta_data.h"
#include "meta_admin_types.h"
-#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
+#include "runtime/rpc/rpc_address.h"
#include "utils/command_manager.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
@@ -50,13 +51,9 @@ namespace replication {
configuration_proposal_action
new_proposal_action(const host_port &target, const host_port &node,
config_type::type type)
{
- const auto &target_addr =
dsn::dns_resolver::instance().resolve_address(target);
- const auto &node_addr =
dsn::dns_resolver::instance().resolve_address(node);
configuration_proposal_action act;
- act.__set_target(target_addr);
- act.__set_node(node_addr);
- act.__set_hp_target(target);
- act.__set_hp_node(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(act, target, target);
+ SET_IP_AND_HOST_PORT_BY_DNS(act, node, node);
act.__set_type(type);
return act;
}
diff --git a/src/meta/meta_data.cpp b/src/meta/meta_data.cpp
index 029cca0e2..cc6e96c01 100644
--- a/src/meta/meta_data.cpp
+++ b/src/meta/meta_data.cpp
@@ -187,11 +187,14 @@ void proposal_actions::reset_tracked_current_learner()
void proposal_actions::track_current_learner(const dsn::host_port &node, const
replica_info &info)
{
- if (empty())
+ if (empty()) {
return;
- configuration_proposal_action &act = acts.front();
- if (act.hp_node != node)
+ }
+ const auto &act = acts.front();
+ CHECK(act.hp_node, "");
+ if (act.hp_node != node) {
return;
+ }
// currently we only handle add secondary
// TODO: adjust other proposals according to replica info collected
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 00ca0882a..b619bd55b 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -693,8 +693,7 @@ void
meta_service::on_list_nodes(configuration_list_nodes_rpc rpc)
if (request.status == node_status::NS_INVALID || request.status ==
node_status::NS_ALIVE) {
info.status = node_status::NS_ALIVE;
for (auto &node : _alive_set) {
- info.address =
dsn::dns_resolver::instance().resolve_address(node);
- info.__set_hp_address(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(info, node, node);
response.infos.push_back(info);
}
}
@@ -702,8 +701,7 @@ void
meta_service::on_list_nodes(configuration_list_nodes_rpc rpc)
request.status == node_status::NS_UNALIVE) {
info.status = node_status::NS_UNALIVE;
for (auto &node : _dead_set) {
- info.address =
dsn::dns_resolver::instance().resolve_address(node);
- info.__set_hp_address(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(info, node, node);
response.infos.push_back(info);
}
}
diff --git a/src/meta/meta_split_service.cpp b/src/meta/meta_split_service.cpp
index 1fc770ae9..00f4449a5 100644
--- a/src/meta/meta_split_service.cpp
+++ b/src/meta/meta_split_service.cpp
@@ -42,6 +42,7 @@
#include "metadata_types.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
+#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/async_calls.h"
#include "utils/blob.h"
#include "utils/error_code.h"
@@ -304,8 +305,7 @@ void
meta_split_service::on_add_child_on_remote_storage_reply(error_code ec,
update_child_request->config = request.child_config;
update_child_request->info = *app;
update_child_request->type = config_type::CT_REGISTER_CHILD;
- update_child_request->node = request.primary;
- update_child_request->__set_hp_node(request.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(*update_child_request, node, request, primary);
partition_configuration child_config =
app->partitions[child_gpid.get_partition_index()];
child_config.secondaries = request.child_config.secondaries;
diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp
index 6ecbda5c5..37f627f9e 100644
--- a/src/meta/partition_guardian.cpp
+++ b/src/meta/partition_guardian.cpp
@@ -37,11 +37,11 @@
#include "meta/server_load_balancer.h"
#include "meta/server_state.h"
#include "meta/table_metrics.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
+#include "utils/ports.h"
#include "utils/strings.h"
#include "utils/time_utils.h"
@@ -131,18 +131,16 @@ void partition_guardian::reconfig(meta_view view, const
configuration_update_req
}
} else {
when_update_replicas(request.type, [cc, &request](bool is_adding) {
+ host_port hp;
+ GET_HOST_PORT(request, node, hp);
if (is_adding) {
- cc->remove_from_dropped(request.hp_node);
+ cc->remove_from_dropped(hp);
// when some replicas are added to partition_config
// we should try to adjust the size of drop_list
cc->check_size();
} else {
- cc->remove_from_serving(request.hp_node);
-
- CHECK(cc->record_drop_history(request.hp_node),
- "node({}({})) has been in the dropped",
- request.hp_node,
- request.node);
+ cc->remove_from_serving(hp);
+ CHECK(cc->record_drop_history(hp), "node({}) has been in the
dropped", hp);
}
});
}
@@ -161,23 +159,28 @@ bool partition_guardian::from_proposals(meta_view &view,
return false;
}
action = *(cc.lb_actions.front());
+ host_port target;
+ host_port node;
+ GET_HOST_PORT(action, target, target);
std::string reason;
- if (!action.target || !action.hp_target) {
+ if (!target) {
reason = "action target is invalid";
goto invalid_action;
}
- if (!action.node || !action.hp_node) {
- reason = "action node is invalid";
+ if (!is_node_alive(*(view.nodes), target)) {
+ reason = fmt::format("action target({}) is not alive", target);
goto invalid_action;
}
- if (!is_node_alive(*(view.nodes), action.hp_target)) {
- reason = fmt::format("action target({}) is not alive",
action.hp_target);
+ GET_HOST_PORT(action, node, node);
+ if (!node) {
+ reason = "action node is invalid";
goto invalid_action;
}
- if (!is_node_alive(*(view.nodes), action.hp_node)) {
- reason = fmt::format("action node({}) is not alive", action.hp_node);
+ if (!is_node_alive(*(view.nodes), node)) {
+ reason = fmt::format("action node({}) is not alive", node);
goto invalid_action;
}
+
if (cc.lb_actions.is_abnormal_learning_proposal()) {
reason = "learning process abnormal";
goto invalid_action;
@@ -185,24 +188,22 @@ bool partition_guardian::from_proposals(meta_view &view,
switch (action.type) {
case config_type::CT_ASSIGN_PRIMARY:
- is_action_valid = (action.hp_node == action.hp_target && !pc.primary &&
- !is_secondary(pc, action.hp_node));
+ is_action_valid = (node == target && !pc.primary && !is_secondary(pc,
node));
break;
case config_type::CT_UPGRADE_TO_PRIMARY:
- is_action_valid =
- (action.hp_node == action.hp_target && !pc.primary &&
is_secondary(pc, action.hp_node));
+ is_action_valid = (node == target && !pc.primary && is_secondary(pc,
node));
break;
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
- is_action_valid = (is_primary(pc, action.hp_target) &&
!is_secondary(pc, action.hp_node));
- is_action_valid = (is_action_valid && is_node_alive(*(view.nodes),
action.hp_node));
+ is_action_valid = (is_primary(pc, target) && !is_secondary(pc, node));
+ is_action_valid = (is_action_valid && is_node_alive(*(view.nodes),
node));
break;
case config_type::CT_DOWNGRADE_TO_INACTIVE:
case config_type::CT_REMOVE:
- is_action_valid = (is_primary(pc, action.hp_target) && is_member(pc,
action.hp_node));
+ is_action_valid = (is_primary(pc, target) && is_member(pc, node));
break;
case config_type::CT_DOWNGRADE_TO_SECONDARY:
- is_action_valid = (action.hp_target == action.hp_node &&
is_primary(pc, action.hp_target));
+ is_action_valid = (target == node && is_primary(pc, target));
break;
default:
is_action_valid = false;
@@ -245,38 +246,38 @@ pc_status
partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
action.type = config_type::CT_INVALID;
// try to upgrade a secondary to primary if the primary is missing
- if (pc.hp_secondaries.size() > 0) {
- action.node.set_invalid();
- action.__set_hp_node(host_port());
-
- for (int i = 0; i < pc.hp_secondaries.size(); ++i) {
- auto ns = get_node_state(*(view.nodes), pc.hp_secondaries[i],
false);
- CHECK_NOTNULL(ns, "invalid secondary address, address = {}",
pc.hp_secondaries[i]);
- if (!ns->alive())
+ if (!pc.hp_secondaries.empty()) {
+ RESET_IP_AND_HOST_PORT(action, node);
+ for (const auto &hp_secondary : pc.hp_secondaries) {
+ const auto ns = get_node_state(*(view.nodes), hp_secondary, false);
+ CHECK_NOTNULL(ns, "invalid secondary: {}", hp_secondary);
+ if (dsn_unlikely(!ns->alive())) {
continue;
+ }
// find a node with minimal primaries
- newly_partitions *np = newly_partitions_ext::get_inited(ns);
- if (!action.hp_node ||
- np->less_primaries(*get_newly_partitions(*(view.nodes),
action.hp_node),
- gpid.get_app_id())) {
- action.node =
dsn::dns_resolver::instance().resolve_address(ns->host_port());
- action.__set_hp_node(ns->host_port());
+ host_port node;
+ GET_HOST_PORT(action, node, node);
+ auto *np = newly_partitions_ext::get_inited(ns);
+ if (!node ||
+ np->less_primaries(*get_newly_partitions(*(view.nodes), node),
gpid.get_app_id())) {
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, ns->host_port());
}
}
- if (!action.hp_node) {
+ host_port node;
+ GET_HOST_PORT(action, node, node);
+ if (!node) {
LOG_ERROR(
"all nodes for gpid({}) are dead, waiting for some secondary
to come back....",
gpid_name);
result = pc_status::dead;
} else {
action.type = config_type::CT_UPGRADE_TO_PRIMARY;
- newly_partitions *np = get_newly_partitions(*(view.nodes),
action.hp_node);
+ newly_partitions *np = get_newly_partitions(*(view.nodes), node);
np->newly_add_primary(gpid.get_app_id(), true);
- action.target = action.node;
- action.hp_target = action.hp_node;
+ SET_OBJ_IP_AND_HOST_PORT(action, target, action, node);
result = pc_status::ill;
}
}
@@ -300,10 +301,8 @@ pc_status partition_guardian::on_missing_primary(meta_view
&view, const dsn::gpi
}
if (min_primary_server_np != nullptr) {
- action.node =
dsn::dns_resolver::instance().resolve_address(min_primary_server);
- action.__set_hp_node(min_primary_server);
- action.target = action.node;
- action.__set_hp_target(action.hp_node);
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, min_primary_server);
+ SET_OBJ_IP_AND_HOST_PORT(action, target, action, node);
action.type = config_type::CT_ASSIGN_PRIMARY;
min_primary_server_np->newly_add_primary(gpid.get_app_id(), false);
}
@@ -320,8 +319,7 @@ pc_status partition_guardian::on_missing_primary(meta_view
&view, const dsn::gpi
// so the last removed replica can't act as primary directly.
std::string reason;
config_context &cc = *get_config_context(*view.apps, gpid);
- action.node.set_invalid();
- action.__set_hp_node(host_port());
+ RESET_IP_AND_HOST_PORT(action, node);
for (int i = 0; i < cc.dropped.size(); ++i) {
const dropped_replica &dr = cc.dropped[i];
char time_buf[30] = {0};
@@ -358,8 +356,7 @@ pc_status partition_guardian::on_missing_primary(meta_view
&view, const dsn::gpi
LOG_WARNING("{}: the only node({}) is dead, waiting it to come
back",
gpid_name,
FMT_HOST_PORT_AND_IP(pc, last_drops.back()));
- action.hp_node = pc.hp_last_drops.back();
- action.node = pc.last_drops.back();
+ SET_OBJ_IP_AND_HOST_PORT(action, node, pc, last_drops.back());
} else {
std::vector<dsn::host_port> nodes(pc.hp_last_drops.end() - 2,
pc.hp_last_drops.end());
std::vector<dropped_replica> collected_info(2);
@@ -424,24 +421,24 @@ pc_status
partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
int64_t larger_pd =
std::max(previous_dead.last_prepared_decree,
recent_dead.last_prepared_decree);
if (larger_pd >= pc.last_committed_decree && larger_pd >=
larger_cd) {
+ host_port hp;
if (gap1 != 0) {
// 1. choose node with larger ballot
- action.hp_node = gap1 < 0 ? recent_dead.node :
previous_dead.node;
+ hp = gap1 < 0 ? recent_dead.node :
previous_dead.node;
} else if (gap2 != 0) {
// 2. choose node with larger last_committed_decree
- action.hp_node = gap2 < 0 ? recent_dead.node :
previous_dead.node;
+ hp = gap2 < 0 ? recent_dead.node :
previous_dead.node;
} else {
// 3. choose node with larger last_prepared_decree
- action.hp_node =
previous_dead.last_prepared_decree >
-
recent_dead.last_prepared_decree
- ? previous_dead.node
- : recent_dead.node;
+ hp = previous_dead.last_prepared_decree >
+ recent_dead.last_prepared_decree
+ ? previous_dead.node
+ : recent_dead.node;
}
- action.node =
dsn::dns_resolver::instance().resolve_address(action.hp_node);
- LOG_INFO("{}: select {}({}) as a new primary",
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, hp);
+ LOG_INFO("{}: select {} as a new primary",
gpid_name,
- action.hp_node,
- action.node);
+ FMT_HOST_PORT_AND_IP(action, node));
} else {
char buf[1000];
sprintf(buf,
@@ -461,9 +458,10 @@ pc_status partition_guardian::on_missing_primary(meta_view
&view, const dsn::gpi
}
}
+ // Use the action.hp_node after being updated.
if (action.hp_node) {
- action.__set_hp_target(action.hp_node);
- action.target = action.node;
+ CHECK(action.node, "");
+ SET_OBJ_IP_AND_HOST_PORT(action, target, action, node);
action.type = config_type::CT_ASSIGN_PRIMARY;
get_newly_partitions(*view.nodes, action.hp_node)
@@ -480,8 +478,7 @@ pc_status partition_guardian::on_missing_primary(meta_view
&view, const dsn::gpi
for (int i = 0; i < cc.dropped.size(); ++i) {
const dropped_replica &dr = cc.dropped[i];
ddd_node_info ninfo;
- ninfo.node =
dsn::dns_resolver::instance().resolve_address(dr.node);
- ninfo.__set_hp_node(dr.node);
+ SET_IP_AND_HOST_PORT_BY_DNS(ninfo, node, dr.node);
ninfo.drop_time_ms = dr.time;
ninfo.ballot = invalid_ballot;
ninfo.last_committed_decree = invalid_decree;
@@ -547,8 +544,7 @@ pc_status
partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
cc.dropped.back().node);
is_emergency = true;
}
- action.node.set_invalid();
- action.__set_hp_node(host_port());
+ RESET_IP_AND_HOST_PORT(action, node);
if (is_emergency) {
std::ostringstream oss;
@@ -582,8 +578,7 @@ pc_status
partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
cc.prefered_dropped,
cc.prefered_dropped,
cc.prefered_dropped - 1);
- action.hp_node = server.node;
- action.node =
dsn::dns_resolver::instance().resolve_address(server.node);
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, server.node);
cc.prefered_dropped--;
break;
} else {
@@ -598,26 +593,27 @@ pc_status
partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
}
}
- if (!action.hp_node || in_black_list(action.hp_node)) {
- if (action.hp_node) {
- LOG_INFO("gpid({}) refuse to use selected node({}) as it is in
black list",
- gpid,
- action.hp_node);
+ host_port node;
+ GET_HOST_PORT(action, node, node);
+ if (!node || in_black_list(node)) {
+ if (node) {
+ LOG_INFO(
+ "gpid({}) refuse to use selected node({}) as it is in
black list", gpid, node);
}
newly_partitions *min_server_np = nullptr;
- for (auto &pairs : *view.nodes) {
- node_state &ns = pairs.second;
- if (!ns.alive() || is_member(pc, ns.host_port()) ||
in_black_list(ns.host_port()))
+ for (auto & [ _, ns ] : *view.nodes) {
+ if (!ns.alive() || is_member(pc, ns.host_port()) ||
in_black_list(ns.host_port())) {
continue;
+ }
newly_partitions *np = newly_partitions_ext::get_inited(&ns);
if (min_server_np == nullptr ||
np->less_partitions(*min_server_np, gpid.get_app_id())) {
- action.__set_hp_node(ns.host_port());
- action.node =
dsn::dns_resolver::instance().resolve_address(ns.host_port());
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, ns.host_port());
min_server_np = np;
}
}
+ // Use the action.hp_node after being updated.
if (action.hp_node) {
LOG_INFO("gpid({}): can't find valid node in dropped list to
add as secondary, "
"choose new node({}) with minimal partitions serving",
@@ -634,10 +630,10 @@ pc_status
partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
const dropped_replica &server = cc.dropped.back();
if (is_node_alive(*view.nodes, server.node)) {
CHECK(server.node, "invalid server address, address = {}",
server.node);
- action.hp_node = server.node;
- action.node =
dsn::dns_resolver::instance().resolve_address(server.node);
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, server.node);
}
+ // Use the action.hp_node after being updated.
if (action.hp_node) {
LOG_INFO("gpid({}): choose node({}) as secondary coz it is
last_dropped_node and is "
"alive now",
@@ -651,10 +647,10 @@ pc_status
partition_guardian::on_missing_secondary(meta_view &view, const dsn::g
}
}
+ // Use the action.hp_node after being updated.
if (action.hp_node) {
action.type = config_type::CT_ADD_SECONDARY;
- action.target = pc.primary;
- action.__set_hp_target(pc.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(action, target, pc, primary);
newly_partitions *np = get_newly_partitions(*(view.nodes),
action.hp_node);
CHECK_NOTNULL(np, "");
@@ -682,10 +678,8 @@ pc_status
partition_guardian::on_redundant_secondary(meta_view &view, const dsn:
configuration_proposal_action action;
action.type = config_type::CT_REMOVE;
- action.node = pc.secondaries[target];
- action.target = pc.primary;
- action.hp_node = pc.hp_secondaries[target];
- action.hp_target = pc.hp_primary;
+ SET_OBJ_IP_AND_HOST_PORT(action, node, pc, secondaries[target]);
+ SET_OBJ_IP_AND_HOST_PORT(action, target, pc, primary);
// TODO: treat remove as cure proposals too
get_config_context(*view.apps,
gpid)->lb_actions.assign_balancer_proposals({action});
@@ -696,11 +690,13 @@ void partition_guardian::finish_cure_proposal(meta_view
&view,
const dsn::gpid &gpid,
const
configuration_proposal_action &act)
{
- newly_partitions *np = get_newly_partitions(*(view.nodes), act.hp_node);
+ host_port target;
+ GET_HOST_PORT(act, node, target);
+ newly_partitions *np = get_newly_partitions(*(view.nodes), target);
if (np == nullptr) {
LOG_INFO("can't get the newly_partitions extension structure for
node({}), "
"the node may be dead and removed",
- act.hp_node);
+ target);
} else {
if (act.type == config_type::CT_ASSIGN_PRIMARY) {
np->newly_remove_primary(gpid.get_app_id(), false);
diff --git a/src/meta/server_load_balancer.cpp
b/src/meta/server_load_balancer.cpp
index 3bbab0789..1a57858e3 100644
--- a/src/meta/server_load_balancer.cpp
+++ b/src/meta/server_load_balancer.cpp
@@ -32,6 +32,7 @@
#include "dsn.layer2_types.h"
#include "meta/meta_data.h"
#include "meta_admin_types.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
#include "runtime/rpc/rpc_address.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
@@ -174,15 +175,16 @@ void server_load_balancer::register_proposals(meta_view
view,
// to send the proposal to.
// for these proposals, they should keep the target empty and
// the meta-server will fill primary as target.
- if (!act.target) {
- if (pc.hp_primary) {
- act.target = pc.primary;
- act.__set_hp_target(pc.hp_primary);
- } else {
- resp.err = ERR_INVALID_PARAMETERS;
- return;
- }
+ if (act.target) {
+ continue;
+ }
+
+ if (!pc.hp_primary) {
+ resp.err = ERR_INVALID_PARAMETERS;
+ return;
}
+
+ SET_OBJ_IP_AND_HOST_PORT(act, target, pc, primary);
}
resp.err = ERR_OK;
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 6e3924e89..2140105af 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -570,6 +570,8 @@ dsn::error_code
server_state::sync_apps_from_remote_storage()
const blob &value)
mutable {
if (ec == ERR_OK) {
partition_configuration pc;
+ // TODO(yingchun): when upgrade from old version, check if
the fields will be
+ // filled.
// TODO(yingchun): check if the fields will be set after
decoding.
pc.__isset.hp_secondaries = true;
pc.__isset.hp_last_drops = true;
@@ -796,21 +798,19 @@ void
server_state::on_config_sync(configuration_query_by_node_rpc rpc)
bool reject_this_request = false;
response.__isset.gc_replicas = false;
- host_port hp_node;
- GET_HOST_PORT(request, node, hp_node);
-
- LOG_INFO("got config sync request from {}({}), stored_replicas_count({})",
- hp_node,
- request.node,
+ host_port node;
+ GET_HOST_PORT(request, node, node);
+ LOG_INFO("got config sync request from {}, stored_replicas_count({})",
+ node,
request.stored_replicas.size());
{
zauto_read_lock l(_lock);
// sync the partitions to the replica server
- node_state *ns = get_node_state(_nodes, hp_node, false);
+ node_state *ns = get_node_state(_nodes, node, false);
if (ns == nullptr) {
- LOG_INFO("node({}({})) not found in meta server", hp_node,
request.node);
+ LOG_INFO("node({}) not found in meta server", node);
response.err = ERR_OBJECT_NOT_FOUND;
} else {
response.err = ERR_OK;
@@ -825,15 +825,19 @@ void
server_state::on_config_sync(configuration_query_by_node_rpc rpc)
// so if the syncing config is related to the node, we may
need to reject this
// request
if (cc.stage == config_status::pending_remote_sync) {
- configuration_update_request *req =
cc.pending_sync_request.get();
+ configuration_update_request *pending_request =
cc.pending_sync_request.get();
// when register child partition, stage is
config_status::pending_remote_sync,
// but cc.pending_sync_request is not set, see more in
function
// 'register_child_on_meta'
- if (req == nullptr)
+ if (pending_request == nullptr) {
return false;
- if ((req->__isset.hp_node && req->hp_node == hp_node) ||
- req->node == request.node)
+ }
+
+ host_port target;
+ GET_HOST_PORT(*pending_request, node, target);
+ if (target == node) {
return false;
+ }
}
response.partitions[i].info = *app;
@@ -867,8 +871,7 @@ void
server_state::on_config_sync(configuration_query_by_node_rpc rpc)
// the app is deleted but not expired, we need to ignore it
// if the app is deleted and expired, we need to gc it
for (const replica_info &rep : replicas) {
- LOG_DEBUG(
- "receive stored replica from {}({}), pid({})", hp_node,
request.node, rep.pid);
+ LOG_DEBUG("receive stored replica from {}, pid({})", node,
rep.pid);
std::shared_ptr<app_state> app = get_app(rep.pid.get_app_id());
if (app == nullptr || rep.pid.get_partition_index() >=
app->partition_count) {
// This app has garbage partition after cancel split, the
canceled child
@@ -878,64 +881,55 @@ void
server_state::on_config_sync(configuration_query_by_node_rpc rpc)
rep.status == partition_status::PS_ERROR) {
response.gc_replicas.push_back(rep);
LOG_WARNING(
- "notify node({}({})) to gc replica({}) because it
is useless partition "
+ "notify node({}) to gc replica({}) because it is
useless partition "
"which is caused by cancel split",
- hp_node,
- request.node,
+ node,
rep.pid);
} else {
// app is not recognized or partition is not recognized
CHECK(false,
- "gpid({}) on node({}({})) is not exist on meta
server, administrator "
+ "gpid({}) on node({}) is not exist on meta
server, administrator "
"should check consistency of meta data",
rep.pid,
- hp_node,
- request.node);
+ node);
}
} else if (app->status == app_status::AS_DROPPED) {
if (app->expire_second == 0) {
- LOG_INFO(
- "gpid({}) on node({}({})) is of dropped table, but
expire second is "
- "not specified, do not delete it for safety
reason",
- rep.pid,
- hp_node,
- request.node);
+ LOG_INFO("gpid({}) on node({}) is of dropped table,
but expire second is "
+ "not specified, do not delete it for safety
reason",
+ rep.pid,
+ node);
} else if (has_seconds_expired(app->expire_second)) {
// can delete replica only when expire second is
explicitely specified and
// expired.
if (level <= meta_function_level::fl_steady) {
- LOG_INFO(
- "gpid({}) on node({}({})) is of dropped and
expired table, but "
- "current function level is {}, do not delete
it for safety "
- "reason",
- rep.pid,
- hp_node,
- request.node,
-
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
+ LOG_INFO("gpid({}) on node({}) is of dropped and
expired table, but "
+ "current function level is {}, do not
delete it for safety "
+ "reason",
+ rep.pid,
+ node,
+
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
} else {
response.gc_replicas.push_back(rep);
- LOG_WARNING("notify node({}({})) to gc replica({})
coz the app is "
+ LOG_WARNING("notify node({}) to gc replica({}) coz
the app is "
"dropped and expired",
- hp_node,
- request.node,
+ node,
rep.pid);
}
}
} else if (app->status == app_status::AS_AVAILABLE) {
- bool is_useful_replica = collect_replica({&_all_apps,
&_nodes}, hp_node, rep);
+ bool is_useful_replica = collect_replica({&_all_apps,
&_nodes}, node, rep);
if (!is_useful_replica) {
if (level <= meta_function_level::fl_steady) {
- LOG_INFO("gpid({}) on node({}({})) is useless, but
current function "
+ LOG_INFO("gpid({}) on node({}) is useless, but
current function "
"level is {}, do not delete it for safety
reason",
rep.pid,
- hp_node,
- request.node,
+ node,
_meta_function_level_VALUES_TO_NAMES.find(level)->second);
} else {
response.gc_replicas.push_back(rep);
- LOG_WARNING("notify node({}({})) to gc replica({})
coz it is useless",
- hp_node,
- request.node,
+ LOG_WARNING("notify node({}) to gc replica({}) coz
it is useless",
+ node,
rep.pid);
}
}
@@ -952,10 +946,9 @@ void
server_state::on_config_sync(configuration_query_by_node_rpc rpc)
response.err = ERR_BUSY;
response.partitions.clear();
}
- LOG_INFO("send config sync response to {}({}), err({}),
partitions_count({}), "
+ LOG_INFO("send config sync response to {}, err({}), partitions_count({}), "
"gc_replicas_count({})",
- hp_node,
- request.node,
+ node,
response.err,
response.partitions.size(),
response.gc_replicas.size());
@@ -1436,15 +1429,12 @@ void server_state::list_apps(const
configuration_list_apps_request &request,
void server_state::send_proposal(const host_port &target,
const configuration_update_request &proposal)
{
- host_port hp_node;
- GET_HOST_PORT(proposal, node, hp_node);
- LOG_INFO("send proposal {} for gpid({}), ballot = {}, target = {}, node =
{}({})",
+ LOG_INFO("send proposal {} for gpid({}), ballot = {}, target = {}, node =
{}",
::dsn::enum_to_string(proposal.type),
proposal.config.pid,
proposal.config.ballot,
target,
- hp_node,
- proposal.node);
+ FMT_HOST_PORT_AND_IP(proposal, node));
dsn::message_ex *msg =
dsn::message_ex::create_request(RPC_CONFIG_PROPOSAL, 0,
proposal.config.pid.thread_hash());
dsn::marshall(msg, proposal);
@@ -1458,17 +1448,17 @@ void server_state::send_proposal(const
configuration_proposal_action &action,
configuration_update_request request;
request.info = app;
request.type = action.type;
- request.node = action.node;
- request.__set_hp_node(action.hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(request, node, action, node);
request.config = pc;
- send_proposal(action.hp_target, request);
+ host_port target;
+ GET_HOST_PORT(action, target, target);
+ send_proposal(target, request);
}
void server_state::request_check(const partition_configuration &old,
const configuration_update_request &request)
{
const partition_configuration &new_config = request.config;
-
switch (request.type) {
case config_type::CT_ASSIGN_PRIMARY:
if (request.__isset.hp_node) {
@@ -1517,7 +1507,7 @@ void server_state::request_check(const
partition_configuration &old,
CHECK(!utils::contains(old.secondaries, request.node), "");
}
break;
- case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT:
+ case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT: {
if (request.__isset.hp_node) {
CHECK_EQ(old.hp_primary, new_config.hp_primary);
CHECK(old.hp_secondaries == new_config.hp_secondaries, "");
@@ -1526,6 +1516,7 @@ void server_state::request_check(const
partition_configuration &old,
CHECK(old.secondaries == new_config.secondaries, "");
}
break;
+ }
default:
break;
}
@@ -1543,8 +1534,8 @@ void server_state::update_configuration_locally(
health_status old_health_status = partition_health_status(old_cfg,
min_2pc_count);
health_status new_health_status = partition_health_status(new_cfg,
min_2pc_count);
- host_port hp_node;
- GET_HOST_PORT(*config_request, node, hp_node);
+ host_port node;
+ GET_HOST_PORT(*config_request, node, node);
if (app.is_stateful) {
CHECK(old_cfg.ballot == invalid_ballot || old_cfg.ballot + 1 ==
new_cfg.ballot,
@@ -1554,9 +1545,8 @@ void server_state::update_configuration_locally(
node_state *ns = nullptr;
if (config_request->type != config_type::CT_DROP_PARTITION) {
- ns = get_node_state(_nodes, hp_node, false);
- CHECK_NOTNULL(
- ns, "invalid node address, address = {}({})", hp_node,
config_request->node);
+ ns = get_node_state(_nodes, node, false);
+ CHECK_NOTNULL(ns, "invalid node: {}", node);
}
#ifndef NDEBUG
request_check(old_cfg, *config_request);
@@ -1597,6 +1587,7 @@ void server_state::update_configuration_locally(
break;
case config_type::CT_REGISTER_CHILD: {
ns->put_partition(gpid, true);
+ // TODO(yingchun): optimize this
if (config_request->config.__isset.hp_secondaries) {
for (const auto &secondary :
config_request->config.hp_secondaries) {
auto secondary_node = get_node_state(_nodes, secondary,
false);
@@ -1618,25 +1609,22 @@ void server_state::update_configuration_locally(
} else {
CHECK_EQ(old_cfg.ballot, new_cfg.ballot);
- auto host_node = host_port::from_address(config_request->host_node);
+ const auto host_node =
host_port::from_address(config_request->host_node);
new_cfg = old_cfg;
partition_configuration_stateless pcs(new_cfg);
if (config_request->type == config_type::type::CT_ADD_SECONDARY) {
pcs.hosts().emplace_back(host_node);
- pcs.workers().emplace_back(hp_node);
+ pcs.workers().emplace_back(node);
} else {
auto it = std::remove(pcs.hosts().begin(), pcs.hosts().end(),
host_node);
pcs.hosts().erase(it);
- it = std::remove(pcs.workers().begin(), pcs.workers().end(),
hp_node);
+ it = std::remove(pcs.workers().begin(), pcs.workers().end(), node);
pcs.workers().erase(it);
}
auto it = _nodes.find(host_node);
- CHECK(it != _nodes.end(),
- "invalid node address, address = {}({})",
- host_node,
- config_request->host_node);
+ CHECK(it != _nodes.end(), "invalid node: {}", host_node);
if (config_type::CT_REMOVE == config_request->type) {
it->second.remove_partition(gpid, false);
} else {
@@ -1761,10 +1749,11 @@ void
server_state::on_update_configuration_on_remote_reply(
// ignore adding secondary if
add_secondary_enable_flow_control = true
} else {
config_request->type = action.type;
- config_request->node = action.node;
- config_request->__set_hp_node(action.hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(*config_request, node, action,
node);
config_request->info = *app;
- send_proposal(action.hp_target, *config_request);
+ host_port target;
+ GET_HOST_PORT(action, target, target);
+ send_proposal(target, *config_request);
}
}
}
@@ -1812,8 +1801,7 @@ void
server_state::drop_partition(std::shared_ptr<app_state> &app, int pidx)
request.info = *app;
request.type = config_type::CT_DROP_PARTITION;
- request.node = pc.primary;
- request.__set_hp_node(pc.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary);
request.config = pc;
for (auto &node : pc.hp_secondaries) {
@@ -1885,8 +1873,7 @@ void
server_state::downgrade_primary_to_inactive(std::shared_ptr<app_state> &app
request.info = *app;
request.config = pc;
request.type = config_type::CT_DOWNGRADE_TO_INACTIVE;
- request.node = pc.primary;
- request.__set_hp_node(pc.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(request, node, pc, primary);
request.config.ballot++;
RESET_IP_AND_HOST_PORT(request.config, primary);
maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type);
@@ -1912,9 +1899,10 @@ void
server_state::downgrade_secondary_to_inactive(std::shared_ptr<app_state> &a
request.info = *app;
request.config = pc;
request.type = config_type::CT_DOWNGRADE_TO_INACTIVE;
- request.node = dsn::dns_resolver::instance().resolve_address(node);
- request.__set_hp_node(node);
- send_proposal(pc.hp_primary, request);
+ SET_IP_AND_HOST_PORT_BY_DNS(request, node, node);
+ host_port primary;
+ GET_HOST_PORT(pc, primary, primary);
+ send_proposal(primary, request);
} else {
LOG_INFO("gpid({}.{}) is syncing with remote storage, ignore the
remove seconary({})",
app->app_id,
@@ -1932,8 +1920,7 @@ void
server_state::downgrade_stateless_nodes(std::shared_ptr<app_state> &app,
req->info = *app;
req->type = config_type::CT_REMOVE;
req->host_node = dsn::dns_resolver::instance().resolve_address(address);
- req->node.set_invalid();
- req->hp_node.reset();
+ RESET_IP_AND_HOST_PORT(*req, node);
req->config = app->partitions[pidx];
config_context &cc = app->helpers->contexts[pidx];
@@ -1942,13 +1929,15 @@ void
server_state::downgrade_stateless_nodes(std::shared_ptr<app_state> &app,
unsigned i = 0;
for (; i < pc.hp_secondaries.size(); ++i) {
if (pc.hp_secondaries[i] == address) {
- req->node = pc.last_drops[i];
- req->__set_hp_node(pc.hp_last_drops[i]);
+ SET_OBJ_IP_AND_HOST_PORT(*req, node, pc, last_drops[i]);
break;
}
}
- CHECK(req->node, "invalid node address, address = {}", req->node);
- // remove host_node & node from secondaries/last_drops, as it will be sync
to remote storage
+ host_port node;
+ GET_HOST_PORT(*req, node, node);
+ CHECK(node, "invalid node: {}", node);
+ // remove host_node & node from secondaries/last_drops, as it will be sync
to remote
+ // storage
for (++i; i < pc.hp_secondaries.size(); ++i) {
pc.secondaries[i - 1] = pc.secondaries[i];
pc.last_drops[i - 1] = pc.last_drops[i];
@@ -1965,7 +1954,7 @@ void
server_state::downgrade_stateless_nodes(std::shared_ptr<app_state> &app,
"removing host({}) worker({})",
pc.pid,
req->host_node,
- req->node);
+ node);
cc.cancel_sync();
}
cc.stage = config_status::pending_remote_sync;
@@ -2317,8 +2306,8 @@ server_state::sync_apps_from_replica_nodes(const
std::vector<dsn::host_port> &re
LOG_INFO("send query app and replica request to node({})",
replica_nodes[i]);
auto app_query_req = std::make_unique<query_app_info_request>();
- app_query_req->meta_server = dsn_primary_address();
- app_query_req->__set_hp_meta_server(dsn_primary_host_port());
+ SET_IP_AND_HOST_PORT(
+ *app_query_req, meta_server, dsn_primary_address(),
dsn_primary_host_port());
query_app_info_rpc app_rpc(std::move(app_query_req),
RPC_QUERY_APP_INFO);
const auto &addr =
dsn::dns_resolver::instance().resolve_address(replica_nodes[i]);
app_rpc.call(addr,
@@ -2428,19 +2417,14 @@ void server_state::on_start_recovery(const
configuration_recovery_request &req,
configuration_recovery_response &resp)
{
LOG_INFO("start recovery, node_count = {}, skip_bad_nodes = {},
skip_lost_partitions = {}",
- req.recovery_set.size(),
+ req.recovery_nodes.size(),
req.skip_bad_nodes ? "true" : "false",
req.skip_lost_partitions ? "true" : "false");
- if (req.__isset.hp_recovery_set) {
- resp.err = sync_apps_from_replica_nodes(
- req.hp_recovery_set, req.skip_bad_nodes, req.skip_lost_partitions,
resp.hint_message);
- } else {
- auto hp_recovery_set = std::vector<host_port>();
- host_port::fill_host_ports_from_addresses(req.recovery_set,
hp_recovery_set);
- resp.err = sync_apps_from_replica_nodes(
- hp_recovery_set, req.skip_bad_nodes, req.skip_lost_partitions,
resp.hint_message);
- }
+ std::vector<host_port> recovery_nodes;
+ GET_HOST_PORTS(req, recovery_nodes, recovery_nodes);
+ resp.err = sync_apps_from_replica_nodes(
+ recovery_nodes, req.skip_bad_nodes, req.skip_lost_partitions,
resp.hint_message);
if (resp.err != dsn::ERR_OK) {
LOG_ERROR("sync apps from replica nodes failed when do recovery, err =
{}", resp.err);
@@ -2591,7 +2575,8 @@ bool server_state::check_all_partitions()
gpid &pid = add_secondary_gpids[i];
partition_configuration &pc = *get_config(_all_apps, pid);
if (!add_secondary_proposed[i] && pc.hp_secondaries.empty()) {
- configuration_proposal_action &action = add_secondary_actions[i];
+ const auto &action = add_secondary_actions[i];
+ CHECK(action.hp_node, "");
if (_add_secondary_enable_flow_control &&
add_secondary_running_nodes[action.hp_node] >=
_add_secondary_max_count_for_one_node) {
@@ -2609,7 +2594,8 @@ bool server_state::check_all_partitions()
// assign secondary for all
for (int i = 0; i < add_secondary_actions.size(); ++i) {
if (!add_secondary_proposed[i]) {
- configuration_proposal_action &action = add_secondary_actions[i];
+ const auto &action = add_secondary_actions[i];
+ CHECK(action.hp_node, "");
gpid pid = add_secondary_gpids[i];
partition_configuration &pc = *get_config(_all_apps, pid);
if (_add_secondary_enable_flow_control &&
@@ -2619,8 +2605,8 @@ bool server_state::check_all_partitions()
"{}, node = {}",
::dsn::enum_to_string(action.type),
pc.pid,
- action.target,
- action.node);
+ FMT_HOST_PORT_AND_IP(action, target),
+ FMT_HOST_PORT_AND_IP(action, node));
continue;
}
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
diff --git a/src/meta/test/balancer_validator.cpp
b/src/meta/test/balancer_validator.cpp
index 9a9d0599a..9b610f003 100644
--- a/src/meta/test/balancer_validator.cpp
+++ b/src/meta/test/balancer_validator.cpp
@@ -59,7 +59,7 @@ static void check_cure(app_mapper &apps, node_mapper &nodes,
::dsn::partition_co
meta_service svc;
partition_guardian guardian(&svc);
pc_status ps = pc_status::invalid;
- node_state *ns;
+ node_state *ns = nullptr;
configuration_proposal_action act;
while (ps != pc_status::healthy) {
@@ -67,27 +67,34 @@ static void check_cure(app_mapper &apps, node_mapper
&nodes, ::dsn::partition_co
if (act.type == config_type::CT_INVALID)
break;
switch (act.type) {
- case config_type::CT_ASSIGN_PRIMARY:
+ case config_type::CT_ASSIGN_PRIMARY: {
+ CHECK(!pc.primary, "");
CHECK(!pc.hp_primary, "");
+ CHECK(pc.secondaries.empty(), "");
CHECK(pc.hp_secondaries.empty(), "");
+ CHECK_EQ(act.node, act.target);
CHECK_EQ(act.hp_node, act.hp_target);
- CHECK(nodes.find(act.hp_node) != nodes.end(), "");
-
- CHECK_EQ(nodes[act.hp_node].served_as(pc.pid),
partition_status::PS_INACTIVE);
- nodes[act.hp_node].put_partition(pc.pid, true);
- SET_IP_AND_HOST_PORT(pc, primary, act.node, act.hp_node);
+ const auto node = nodes.find(act.hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
+ CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
+ ns->put_partition(pc.pid, true);
+ SET_OBJ_IP_AND_HOST_PORT(pc, primary, act, node);
break;
-
- case config_type::CT_ADD_SECONDARY:
+ }
+ case config_type::CT_ADD_SECONDARY: {
+ CHECK(!is_member(pc, act.node), "");
CHECK(!is_member(pc, act.hp_node), "");
+ CHECK_EQ(pc.primary, act.target);
CHECK_EQ(pc.hp_primary, act.hp_target);
- CHECK(nodes.find(act.hp_node) != nodes.end(), "");
+ const auto node = nodes.find(act.hp_node);
+ CHECK(node != nodes.end(), "");
ADD_IP_AND_HOST_PORT(pc, secondaries, act.node, act.hp_node);
- ns = &nodes[act.hp_node];
+ ns = &node->second;
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, false);
break;
-
+ }
default:
CHECK(false, "");
break;
@@ -101,16 +108,18 @@ static void check_cure(app_mapper &apps, node_mapper
&nodes, ::dsn::partition_co
ps = guardian.cure({&apps, &nodes}, pc.pid, act);
CHECK_EQ(act.type, config_type::CT_UPGRADE_TO_PRIMARY);
+ CHECK(!pc.primary, "");
CHECK(!pc.hp_primary, "");
+ CHECK_EQ(act.node, act.target);
CHECK_EQ(act.hp_node, act.hp_target);
+ CHECK(is_secondary(pc, act.node), "");
CHECK(is_secondary(pc, act.hp_node), "");
- CHECK(nodes.find(act.hp_node) != nodes.end(), "");
-
- ns = &nodes[act.hp_node];
- SET_IP_AND_HOST_PORT(pc, primary, act.node, act.hp_node);
+ const auto node = nodes.find(act.hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
+ SET_OBJ_IP_AND_HOST_PORT(pc, primary, act, node);
std::remove(pc.secondaries.begin(), pc.secondaries.end(), pc.primary);
std::remove(pc.hp_secondaries.begin(), pc.hp_secondaries.end(),
pc.hp_primary);
-
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY);
ns->put_partition(pc.pid, true);
}
diff --git a/src/meta/test/meta_partition_guardian_test.cpp
b/src/meta/test/meta_partition_guardian_test.cpp
index 0281d8038..dbc3bf08e 100644
--- a/src/meta/test/meta_partition_guardian_test.cpp
+++ b/src/meta/test/meta_partition_guardian_test.cpp
@@ -54,6 +54,7 @@
#include "meta_service_test_app.h"
#include "meta_test_base.h"
#include "metadata_types.h"
+#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
@@ -63,6 +64,7 @@
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
+#include "utils/fmt_logging.h"
namespace dsn {
namespace replication {
@@ -78,7 +80,8 @@ static void apply_update_request(/*in-out*/
configuration_update_request &update
switch (update_req.type) {
case config_type::CT_ASSIGN_PRIMARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
- SET_IP_AND_HOST_PORT(pc, primary, update_req.node, update_req.hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(pc, primary, update_req, node);
+ // TODO(yingchun): optimize the following code
replica_helper::remove_node(update_req.node, pc.secondaries);
replica_helper::remove_node(update_req.hp_node, pc.hp_secondaries);
break;
@@ -92,8 +95,10 @@ static void apply_update_request(/*in-out*/
configuration_update_request &update
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
if (update_req.hp_node == pc.hp_primary) {
+ CHECK_EQ(update_req.node, pc.primary);
RESET_IP_AND_HOST_PORT(pc, primary);
} else {
+ CHECK_NE(update_req.node, pc.primary);
replica_helper::remove_node(update_req.node, pc.secondaries);
replica_helper::remove_node(update_req.hp_node, pc.hp_secondaries);
}
@@ -230,7 +235,9 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_UPGRADE_TO_PRIMARY);
EXPECT_TRUE(is_secondary(pc, update_req->hp_node));
+ EXPECT_TRUE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, update_req->hp_node);
+ EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
last_addr = update_req->hp_node;
proposal_sent = true;
@@ -253,7 +260,9 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(config_type::CT_UPGRADE_TO_PRIMARY, update_req->type);
EXPECT_EQ(update_req->hp_node, last_addr);
+ EXPECT_EQ(update_req->node,
dsn::dns_resolver::instance().resolve_address(last_addr));
EXPECT_EQ(target, update_req->hp_node);
+ EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
proposal_sent = true;
apply_update_request(*update_req);
@@ -290,7 +299,9 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_UPGRADE_TO_PRIMARY);
EXPECT_TRUE(is_secondary(pc, update_req->hp_node));
+ EXPECT_TRUE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, update_req->hp_node);
+ EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
proposal_sent = true;
last_addr = update_req->hp_node;
@@ -314,7 +325,9 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_UPGRADE_TO_PRIMARY);
EXPECT_TRUE(is_secondary(pc, update_req->hp_node));
+ EXPECT_TRUE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, update_req->hp_node);
+ EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
EXPECT_NE(target, last_addr);
proposal_sent = true;
@@ -351,6 +364,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_FALSE(is_secondary(pc, update_req->hp_node));
+ EXPECT_FALSE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, nodes[0]);
last_addr = update_req->hp_node;
@@ -374,6 +388,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_EQ(update_req->hp_node, last_addr);
+ EXPECT_EQ(update_req->node,
dsn::dns_resolver::instance().resolve_address(last_addr));
EXPECT_EQ(target, nodes[0]);
proposal_sent = true;
@@ -410,12 +425,12 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_FALSE(is_secondary(pc, update_req->hp_node));
+ EXPECT_FALSE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, nodes[0]);
update_req->config.ballot++;
update_req->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
- update_req->node = update_req->config.secondaries[0];
- update_req->hp_node = update_req->config.hp_secondaries[0];
+ SET_OBJ_IP_AND_HOST_PORT(*update_req, node, update_req->config,
secondaries[0]);
CLEAR_IP_AND_HOST_PORT(update_req->config, secondaries);
proposal_sent = true;
@@ -452,6 +467,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_FALSE(is_secondary(pc, update_req->hp_node));
+ EXPECT_FALSE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, nodes[0]);
last_addr = update_req->hp_node;
@@ -477,6 +493,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_NE(update_req->hp_node, last_addr);
EXPECT_FALSE(is_secondary(pc, update_req->hp_node));
+ EXPECT_FALSE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, nodes[0]);
proposal_sent = true;
@@ -514,7 +531,9 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_FALSE(is_secondary(pc, update_req->hp_node));
+ EXPECT_FALSE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, pc.hp_primary);
+ EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
pc.primary);
proposal_sent = true;
svc->set_node_state({pc.hp_primary}, false);
@@ -815,8 +834,7 @@ void meta_partition_guardian_test::cure()
fake_request.info = *the_app;
fake_request.config = the_app->partitions[i];
fake_request.type = action.type;
- fake_request.node = action.node;
- fake_request.__set_hp_node(action.hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(fake_request, node, action, node);
fake_request.host_node = action.node;
guardian.reconfig({&app, &nodes}, fake_request);
diff --git a/src/meta/test/meta_split_service_test.cpp
b/src/meta/test/meta_split_service_test.cpp
index bf816f773..1689038a9 100644
--- a/src/meta/test/meta_split_service_test.cpp
+++ b/src/meta/test/meta_split_service_test.cpp
@@ -59,7 +59,6 @@
#include "meta_test_base.h"
#include "metadata_types.h"
#include "partition_split_types.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/blob.h"
@@ -380,7 +379,6 @@ public:
const int32_t PARENT_INDEX = 0;
const int32_t CHILD_INDEX = 4;
const host_port NODE = host_port("localhost", 10086);
- const rpc_address NODE_ADDR =
dsn::dns_resolver::instance().resolve_address(NODE);
std::shared_ptr<app_state> app;
};
@@ -508,8 +506,7 @@ TEST_F(meta_split_service_test, on_config_sync_test)
info1.pid = pid1;
info2.pid = pid2;
configuration_query_by_node_request req;
- req.node = NODE_ADDR;
- req.__set_hp_node(NODE);
+ SET_IP_AND_HOST_PORT_BY_DNS(req, node, NODE);
req.__isset.stored_replicas = true;
req.stored_replicas.emplace_back(info1);
req.stored_replicas.emplace_back(info2);
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index f3dcdaf36..149f0735e 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -134,6 +134,7 @@ void generate_app(/*out*/ std::shared_ptr<app_state> &app,
CHECK(pc.hp_primary, "");
CHECK(!is_secondary(pc, pc.hp_primary), "");
+ CHECK(!is_secondary(pc, pc.primary), "");
CHECK_EQ(pc.hp_secondaries.size(), 2);
CHECK_NE(pc.hp_secondaries[0], pc.hp_secondaries[1]);
}
@@ -287,7 +288,7 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
nodes_fs_manager *manager)
{
dsn::partition_configuration &pc = *get_config(apps, pid);
- node_state *ns;
+ node_state *ns = nullptr;
++pc.ballot;
CHECK_NE(act.type, config_type::CT_INVALID);
@@ -303,88 +304,104 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
GET_HOST_PORT(act, node, hp_node);
switch (act.type) {
- case config_type::CT_ASSIGN_PRIMARY:
+ case config_type::CT_ASSIGN_PRIMARY: {
+ CHECK_EQ(act.hp_node, act.hp_target);
CHECK_EQ(act.node, act.target);
CHECK(!pc.hp_primary, "");
CHECK(!pc.primary, "");
CHECK(pc.hp_secondaries.empty(), "");
CHECK(pc.secondaries.empty(), "");
-
SET_IP_AND_HOST_PORT(pc, primary, act.node, hp_node);
- ns = &nodes[hp_node];
+ const auto node = nodes.find(hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, true);
break;
-
- case config_type::CT_ADD_SECONDARY:
+ }
+ case config_type::CT_ADD_SECONDARY: {
CHECK_EQ(hp_target, pc.hp_primary);
+ CHECK_EQ(act.hp_target, pc.hp_primary);
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, hp_node), "");
-
+ CHECK(!is_member(pc, act.node), "");
ADD_IP_AND_HOST_PORT(pc, secondaries, act.node, hp_node);
- ns = &nodes[hp_node];
+ const auto node = nodes.find(hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, false);
-
break;
-
- case config_type::CT_DOWNGRADE_TO_SECONDARY:
+ }
+ case config_type::CT_DOWNGRADE_TO_SECONDARY: {
+ CHECK_EQ(act.hp_node, act.hp_target);
CHECK_EQ(act.node, act.target);
CHECK_EQ(hp_node, hp_target);
+ CHECK_EQ(act.hp_node, pc.hp_primary);
CHECK_EQ(act.node, pc.primary);
CHECK_EQ(hp_node, pc.hp_primary);
- CHECK(nodes.find(hp_node) != nodes.end(), "");
CHECK(!is_secondary(pc, pc.hp_primary), "");
- nodes[hp_node].remove_partition(pc.pid, true);
+ CHECK(!is_secondary(pc, pc.primary), "");
+ const auto node = nodes.find(hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
+ ns->remove_partition(pc.pid, true);
ADD_IP_AND_HOST_PORT(pc, secondaries, pc.primary, pc.hp_primary);
RESET_IP_AND_HOST_PORT(pc, primary);
break;
-
- case config_type::CT_UPGRADE_TO_PRIMARY:
+ }
+ case config_type::CT_UPGRADE_TO_PRIMARY: {
CHECK(!pc.hp_primary, "");
CHECK(!pc.primary, "");
CHECK_EQ(hp_node, hp_target);
+ CHECK_EQ(act.hp_node, act.hp_target);
CHECK_EQ(act.node, act.target);
CHECK(is_secondary(pc, hp_node), "");
- CHECK(nodes.find(hp_node) != nodes.end(), "");
-
- ns = &nodes[hp_node];
+ CHECK(is_secondary(pc, act.node), "");
+ const auto node = nodes.find(hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
SET_IP_AND_HOST_PORT(pc, primary, act.node, hp_node);
CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), "");
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");
ns->put_partition(pc.pid, true);
break;
-
- case config_type::CT_ADD_SECONDARY_FOR_LB:
+ }
+ case config_type::CT_ADD_SECONDARY_FOR_LB: {
CHECK_EQ(hp_target, pc.hp_primary);
+ CHECK_EQ(act.hp_target, pc.hp_primary);
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, hp_node), "");
+ CHECK(!is_member(pc, act.node), "");
CHECK(act.hp_node, "");
CHECK(act.node, "");
ADD_IP_AND_HOST_PORT(pc, secondaries, act.node, hp_node);
-
- ns = &nodes[hp_node];
+ const auto node = nodes.find(hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
ns->put_partition(pc.pid, false);
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY);
break;
-
+ }
// in balancer, remove primary is not allowed
case config_type::CT_REMOVE:
- case config_type::CT_DOWNGRADE_TO_INACTIVE:
+ case config_type::CT_DOWNGRADE_TO_INACTIVE: {
CHECK(pc.hp_primary, "");
CHECK(pc.primary, "");
CHECK_EQ(pc.hp_primary, hp_target);
+ CHECK_EQ(pc.hp_primary, act.hp_target);
CHECK_EQ(pc.primary, act.target);
CHECK(is_secondary(pc, hp_node), "");
- CHECK(nodes.find(hp_node) != nodes.end(), "");
+ CHECK(is_secondary(pc, act.node), "");
CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), "");
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");
-
- ns = &nodes[hp_node];
+ const auto node = nodes.find(hp_node);
+ CHECK(node != nodes.end(), "");
+ ns = &node->second;
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_SECONDARY);
ns->remove_partition(pc.pid, false);
break;
-
+ }
default:
CHECK(false, "");
break;
@@ -413,16 +430,15 @@ void migration_check_and_apply(app_mapper &apps,
CHECK(hp, "");
}
CHECK(!is_secondary(pc, pc.hp_primary), "");
+ CHECK(!is_secondary(pc, pc.primary), "");
for (unsigned int j = 0; j < proposal->action_list.size(); ++j) {
configuration_proposal_action &act = proposal->action_list[j];
- LOG_DEBUG("the {}th round of action, type: {}, node: {}({}),
target: {}({})",
+ LOG_DEBUG("the {}th round of action, type: {}, node: {}, target:
{}",
j,
dsn::enum_to_string(act.type),
- act.hp_node,
- act.node,
- act.hp_target,
- act.target);
+ FMT_HOST_PORT_AND_IP(act, node),
+ FMT_HOST_PORT_AND_IP(act, target));
proposal_action_check_and_apply(act, proposal->gpid, apps, nodes,
manager);
}
}
diff --git a/src/meta/test/update_configuration_test.cpp
b/src/meta/test/update_configuration_test.cpp
index c824fe561..d8a947f28 100644
--- a/src/meta/test/update_configuration_test.cpp
+++ b/src/meta/test/update_configuration_test.cpp
@@ -54,7 +54,6 @@
#include "meta_admin_types.h"
#include "meta_service_test_app.h"
#include "metadata_types.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/rpc/rpc_host_port.h"
@@ -107,7 +106,7 @@ public:
switch (update_req->type) {
case config_type::CT_ASSIGN_PRIMARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
- SET_IP_AND_HOST_PORT(pc, primary, update_req->node,
update_req->hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(pc, primary, *update_req, node);
replica_helper::remove_node(update_req->node, pc.secondaries);
replica_helper::remove_node(update_req->hp_node,
pc.hp_secondaries);
break;
@@ -121,8 +120,10 @@ public:
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
if (update_req->hp_node == pc.hp_primary) {
+ CHECK_EQ(update_req->node, pc.primary);
RESET_IP_AND_HOST_PORT(pc, primary);
} else {
+ CHECK_NE(update_req->node, pc.primary);
replica_helper::remove_node(update_req->node, pc.secondaries);
replica_helper::remove_node(update_req->hp_node,
pc.hp_secondaries);
}
@@ -347,8 +348,7 @@ void meta_service_test_app::adjust_dropped_size()
req->config.ballot++;
SET_IPS_AND_HOST_PORTS_BY_DNS(req->config, secondaries, nodes[5]);
req->info = info;
- req->node = dsn::dns_resolver::instance().resolve_address(nodes[5]);
- req->__set_hp_node(nodes[5]);
+ SET_IP_AND_HOST_PORT_BY_DNS(*req, node, nodes[5]);
req->type = config_type::CT_UPGRADE_TO_SECONDARY;
call_update_configuration(svc.get(), req);
@@ -357,8 +357,7 @@ void meta_service_test_app::adjust_dropped_size()
// then receive a config_sync request fro nodes[4], which has less data
than node[3]
std::shared_ptr<configuration_query_by_node_request> req2 =
std::make_shared<configuration_query_by_node_request>();
- req2->node = dsn::dns_resolver::instance().resolve_address(nodes[4]);
- req2->__set_hp_node(nodes[4]);
+ SET_IP_AND_HOST_PORT_BY_DNS(*req2, node, nodes[4]);
replica_info rep_info;
rep_info.pid = pc.pid;
diff --git a/src/replica/replica.h b/src/replica/replica.h
index cf95eafeb..f367ab35a 100644
--- a/src/replica/replica.h
+++ b/src/replica/replica.h
@@ -388,7 +388,7 @@ private:
/////////////////////////////////////////////////////////////////
// reconfiguration
void assign_primary(configuration_update_request &proposal);
- void add_potential_secondary(configuration_update_request &proposal);
+ void add_potential_secondary(const configuration_update_request &proposal);
void upgrade_to_secondary_on_primary(const host_port &node);
void downgrade_to_secondary_on_primary(configuration_update_request
&proposal);
void downgrade_to_inactive_on_primary(configuration_update_request
&proposal);
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index c35a8ad7b..ca60af2c4 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -98,8 +98,9 @@ void replica::on_config_proposal(configuration_update_request
&proposal)
{
_checker.only_one_thread_access();
- LOG_INFO_PREFIX(
- "process config proposal {} for {}", enum_to_string(proposal.type),
proposal.node);
+ LOG_INFO_PREFIX("process config proposal {} for {}",
+ enum_to_string(proposal.type),
+ FMT_HOST_PORT_AND_IP(proposal, node));
if (proposal.config.ballot < get_ballot()) {
LOG_WARNING_PREFIX(
@@ -145,7 +146,9 @@ void
replica::on_config_proposal(configuration_update_request &proposal)
void replica::assign_primary(configuration_update_request &proposal)
{
- CHECK_EQ(proposal.hp_node, _stub->primary_host_port());
+ host_port node;
+ GET_HOST_PORT(proposal, node, node);
+ CHECK_EQ(node, _stub->primary_host_port());
if (status() == partition_status::PS_PRIMARY) {
LOG_WARNING_PREFIX("invalid assgin primary proposal as the node is in
{}",
@@ -169,11 +172,11 @@ void replica::assign_primary(configuration_update_request
&proposal)
replica_helper::remove_node(_stub->primary_address(),
proposal.config.secondaries);
replica_helper::remove_node(_stub->primary_host_port(),
proposal.config.hp_secondaries);
- update_configuration_on_meta_server(proposal.type, proposal.hp_node,
proposal.config);
+ update_configuration_on_meta_server(proposal.type, node, proposal.config);
}
// run on primary to send ADD_LEARNER request to candidate replica server
-void replica::add_potential_secondary(configuration_update_request &proposal)
+void replica::add_potential_secondary(const configuration_update_request
&proposal)
{
if (status() != partition_status::PS_PRIMARY) {
LOG_WARNING_PREFIX("ignore add secondary proposal for invalid state,
state = {}",
@@ -185,18 +188,17 @@ void
replica::add_potential_secondary(configuration_update_request &proposal)
CHECK_EQ(proposal.config.pid, _primary_states.membership.pid);
CHECK_EQ(proposal.config.hp_primary,
_primary_states.membership.hp_primary);
CHECK(proposal.config.hp_secondaries ==
_primary_states.membership.hp_secondaries, "");
- CHECK(!_primary_states.check_exist(proposal.hp_node,
partition_status::PS_PRIMARY),
- "node = {}",
- proposal.hp_node);
- CHECK(!_primary_states.check_exist(proposal.hp_node,
partition_status::PS_SECONDARY),
- "node = {}",
- proposal.hp_node);
+
+ host_port node;
+ GET_HOST_PORT(proposal, node, node);
+ CHECK(!_primary_states.check_exist(node, partition_status::PS_PRIMARY),
"node = {}", node);
+ CHECK(!_primary_states.check_exist(node, partition_status::PS_SECONDARY),
"node = {}", node);
int potential_secondaries_count =
_primary_states.membership.hp_secondaries.size() +
_primary_states.learners.size();
if (potential_secondaries_count >=
_primary_states.membership.max_replica_count - 1) {
if (proposal.type == config_type::CT_ADD_SECONDARY) {
- if (_primary_states.learners.find(proposal.hp_node) ==
_primary_states.learners.end()) {
+ if (_primary_states.learners.find(node) ==
_primary_states.learners.end()) {
LOG_INFO_PREFIX(
"already have enough secondaries or potential secondaries,
ignore new "
"potential secondary proposal");
@@ -208,8 +210,7 @@ void
replica::add_potential_secondary(configuration_update_request &proposal)
"secondary proposal");
return;
} else {
- LOG_INFO_PREFIX("add a new secondary({}) for future load
balancer",
- proposal.hp_node);
+ LOG_INFO_PREFIX("add a new secondary({}) for future load
balancer", node);
}
} else {
CHECK(false, "invalid config_type, type = {}",
enum_to_string(proposal.type));
@@ -220,29 +221,30 @@ void
replica::add_potential_secondary(configuration_update_request &proposal)
state.prepare_start_decree = invalid_decree;
state.timeout_task = nullptr; // TODO: add timer for learner task
- auto it = _primary_states.learners.find(proposal.hp_node);
+ auto it = _primary_states.learners.find(node);
if (it != _primary_states.learners.end()) {
state.signature = it->second.signature;
} else {
state.signature = ++_primary_states.next_learning_version;
- _primary_states.learners[proposal.hp_node] = state;
- _primary_states.statuses[proposal.hp_node] =
partition_status::PS_POTENTIAL_SECONDARY;
+ _primary_states.learners[node] = state;
+ _primary_states.statuses[node] =
partition_status::PS_POTENTIAL_SECONDARY;
}
group_check_request request;
request.app = _app_info;
- request.node = proposal.node;
- request.__set_hp_node(proposal.hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(request, node, proposal, node);
_primary_states.get_replica_config(
partition_status::PS_POTENTIAL_SECONDARY, request.config,
state.signature);
request.last_committed_decree = last_committed_decree();
LOG_INFO_PREFIX("call one way {} to start learning with signature
[{:#018x}]",
- proposal.hp_node,
+ FMT_HOST_PORT_AND_IP(proposal, node),
state.signature);
- rpc::call_one_way_typed(
- proposal.node, RPC_LEARN_ADD_LEARNER, request,
get_gpid().thread_hash());
+
rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(node),
+ RPC_LEARN_ADD_LEARNER,
+ request,
+ get_gpid().thread_hash());
}
void replica::upgrade_to_secondary_on_primary(const ::dsn::host_port &node)
@@ -259,13 +261,15 @@ void replica::upgrade_to_secondary_on_primary(const
::dsn::host_port &node)
void replica::downgrade_to_secondary_on_primary(configuration_update_request
&proposal)
{
- if (proposal.config.ballot != get_ballot() || status() !=
partition_status::PS_PRIMARY)
+ if (proposal.config.ballot != get_ballot() || status() !=
partition_status::PS_PRIMARY) {
return;
+ }
CHECK_EQ(proposal.config.pid, _primary_states.membership.pid);
CHECK_EQ(proposal.config.hp_primary,
_primary_states.membership.hp_primary);
CHECK(proposal.config.hp_secondaries ==
_primary_states.membership.hp_secondaries, "");
CHECK_EQ(proposal.hp_node, proposal.config.hp_primary);
+ CHECK_EQ(proposal.node, proposal.config.primary);
RESET_IP_AND_HOST_PORT(proposal.config, primary);
ADD_IP_AND_HOST_PORT(proposal.config, secondaries, proposal.node,
proposal.hp_node);
@@ -282,17 +286,23 @@ void
replica::downgrade_to_inactive_on_primary(configuration_update_request &pro
CHECK_EQ(proposal.config.hp_primary,
_primary_states.membership.hp_primary);
CHECK(proposal.config.hp_secondaries ==
_primary_states.membership.hp_secondaries, "");
- if (proposal.hp_node == proposal.config.hp_primary) {
+ host_port node;
+ GET_HOST_PORT(proposal, node, node);
+ if (node == proposal.config.hp_primary) {
+ CHECK_EQ(proposal.node, proposal.config.primary);
RESET_IP_AND_HOST_PORT(proposal.config, primary);
} else {
- CHECK(replica_helper::remove_node(proposal.node,
proposal.config.secondaries) &&
- replica_helper::remove_node(proposal.hp_node,
proposal.config.hp_secondaries),
+ CHECK_NE(proposal.node, proposal.config.primary);
+ CHECK(replica_helper::remove_node(proposal.node,
proposal.config.secondaries),
"remove node failed, node = {}",
proposal.node);
+ CHECK(replica_helper::remove_node(node,
proposal.config.hp_secondaries),
+ "remove node failed, node = {}",
+ node);
}
update_configuration_on_meta_server(
- config_type::CT_DOWNGRADE_TO_INACTIVE, proposal.hp_node,
proposal.config);
+ config_type::CT_DOWNGRADE_TO_INACTIVE, node, proposal.config);
}
void replica::remove(configuration_update_request &proposal)
@@ -304,18 +314,23 @@ void replica::remove(configuration_update_request
&proposal)
CHECK_EQ(proposal.config.hp_primary,
_primary_states.membership.hp_primary);
CHECK(proposal.config.hp_secondaries ==
_primary_states.membership.hp_secondaries, "");
- auto st =
_primary_states.get_node_status(host_port::from_address(proposal.node));
+ host_port node;
+ GET_HOST_PORT(proposal, node, node);
+ auto st = _primary_states.get_node_status(node);
switch (st) {
case partition_status::PS_PRIMARY:
- CHECK_EQ(proposal.config.hp_primary, proposal.hp_node);
+ CHECK_EQ(proposal.config.hp_primary, node);
+ CHECK_EQ(proposal.config.primary, proposal.node);
RESET_IP_AND_HOST_PORT(proposal.config, primary);
break;
case partition_status::PS_SECONDARY: {
- CHECK(replica_helper::remove_node(proposal.node,
proposal.config.secondaries) &&
- replica_helper::remove_node(proposal.hp_node,
proposal.config.hp_secondaries),
- "remove_node failed, node = {}",
+ CHECK(replica_helper::remove_node(proposal.node,
proposal.config.secondaries),
+ "remove node failed, node = {}",
proposal.node);
+ CHECK(replica_helper::remove_node(node,
proposal.config.hp_secondaries),
+ "remove_node failed, node = {}",
+ node);
} break;
case partition_status::PS_POTENTIAL_SECONDARY:
break;
@@ -323,7 +338,7 @@ void replica::remove(configuration_update_request &proposal)
break;
}
- update_configuration_on_meta_server(config_type::CT_REMOVE,
proposal.hp_node, proposal.config);
+ update_configuration_on_meta_server(config_type::CT_REMOVE, node,
proposal.config);
}
// from primary
@@ -390,8 +405,7 @@ void
replica::update_configuration_on_meta_server(config_type::type type,
request->config = new_config;
request->config.ballot++;
request->type = type;
- request->node = dsn::dns_resolver::instance().resolve_address(node);
- request->__set_hp_node(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(*request, node, node);
::dsn::marshall(msg, *request);
@@ -403,7 +417,7 @@ void
replica::update_configuration_on_meta_server(config_type::type type,
"send update configuration request to meta server, ballot = {}, type =
{}, node = {}",
request->config.ballot,
enum_to_string(request->type),
- request->node);
+ FMT_HOST_PORT_AND_IP(*request, node));
rpc_address target(
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers()));
@@ -498,14 +512,20 @@ void
replica::on_update_configuration_on_meta_server_reply(
case config_type::CT_DOWNGRADE_TO_INACTIVE:
case config_type::CT_UPGRADE_TO_SECONDARY:
break;
- case config_type::CT_REMOVE:
- if (req->hp_node != _stub->primary_host_port()) {
+ case config_type::CT_REMOVE: {
+ host_port node;
+ GET_HOST_PORT(*req, node, node);
+ if (node != _stub->primary_host_port()) {
+ CHECK_NE(req->node, _stub->primary_address());
replica_configuration rconfig;
- replica_helper::get_replica_config(resp.config, req->hp_node,
rconfig);
- rpc::call_one_way_typed(
- req->node, RPC_REMOVE_REPLICA, rconfig,
get_gpid().thread_hash());
+ replica_helper::get_replica_config(resp.config, node, rconfig);
+
rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(node),
+ RPC_REMOVE_REPLICA,
+ rconfig,
+ get_gpid().thread_hash());
}
break;
+ }
case config_type::CT_PRIMARY_FORCE_UPDATE_BALLOT:
CHECK(_is_initializing, "");
_is_initializing = false;
diff --git a/src/replica/replica_failover.cpp b/src/replica/replica_failover.cpp
index 3dab15c8d..6cd7895f7 100644
--- a/src/replica/replica_failover.cpp
+++ b/src/replica/replica_failover.cpp
@@ -36,13 +36,12 @@
#include "replica.h"
#include "replica/replica_context.h"
#include "replica_stub.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
+#include "runtime/rpc/rpc_host_port.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
namespace dsn {
-class host_port;
namespace replication {
@@ -87,8 +86,7 @@ void replica::handle_remote_failure(partition_status::type st,
enum_to_string(st));
{
configuration_update_request request;
- request.node = dsn::dns_resolver::instance().resolve_address(node);
- request.__set_hp_node(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(request, node, node);
request.type = config_type::CT_DOWNGRADE_TO_INACTIVE;
request.config = _primary_states.membership;
downgrade_to_inactive_on_primary(request);
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 136556efc..fc36bd98a 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -841,16 +841,15 @@ void replica_stub::on_config_proposal(const
configuration_update_request &propos
proposal.config.pid,
_primary_host_port_cache,
enum_to_string(proposal.type),
- proposal.node);
+ FMT_HOST_PORT_AND_IP(proposal, node));
return;
}
- LOG_INFO("{}@{}: received config proposal {} for {}({})",
+ LOG_INFO("{}@{}: received config proposal {} for {}",
proposal.config.pid,
_primary_host_port_cache,
enum_to_string(proposal.type),
- proposal.hp_node,
- proposal.node);
+ FMT_HOST_PORT_AND_IP(proposal, node));
replica_ptr rep = get_replica(proposal.config.pid);
if (rep == nullptr) {
@@ -975,7 +974,7 @@ void replica_stub::on_query_app_info(query_app_info_rpc rpc)
const query_app_info_request &req = rpc.request();
query_app_info_response &resp = rpc.response();
- LOG_INFO("got query app info request from ({})", req.meta_server);
+ LOG_INFO("got query app info request from ({})", FMT_HOST_PORT_AND_IP(req,
meta_server));
resp.err = dsn::ERR_OK;
std::set<app_id> visited_apps;
{
@@ -1258,8 +1257,7 @@ void replica_stub::query_configuration_by_node()
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_CONFIG_SYNC);
configuration_query_by_node_request req;
- req.node = primary_address();
- req.__set_hp_node(_primary_host_port);
+ SET_IP_AND_HOST_PORT(req, node, primary_address(), _primary_host_port);
// TODO: send stored replicas may cost network, we shouldn't config the
frequency
get_local_replicas(req.stored_replicas);
@@ -1466,8 +1464,7 @@ void replica_stub::remove_replica_on_meta_server(const
app_info &info,
request->info = info;
request->config = config;
request->config.ballot++;
- request->node = primary_address();
- request->__set_hp_node(_primary_host_port);
+ SET_IP_AND_HOST_PORT(*request, node, primary_address(),
_primary_host_port);
request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
if (_primary_host_port == config.hp_primary) {
diff --git a/src/replica/storage/simple_kv/test/checker.cpp
b/src/replica/storage/simple_kv/test/checker.cpp
index 0aeae82e3..ae59ec495 100644
--- a/src/replica/storage/simple_kv/test/checker.cpp
+++ b/src/replica/storage/simple_kv/test/checker.cpp
@@ -50,7 +50,6 @@
#include "replica/replica_stub.h"
#include "replica/replication_service_app.h"
#include "replica/storage/simple_kv/test/common.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_engine.h"
#include "runtime/service_app.h"
@@ -87,12 +86,10 @@ public:
pc_status result;
if (!pc.hp_primary) {
if (pc.hp_secondaries.size() > 0) {
- action.node = pc.secondaries[0];
- action.__set_hp_node(pc.hp_secondaries[0]);
+ SET_OBJ_IP_AND_HOST_PORT(action, node, pc, secondaries[0]);
for (unsigned int i = 1; i < pc.hp_secondaries.size(); ++i)
if (pc.hp_secondaries[i] < action.hp_node) {
- action.node = pc.secondaries[i];
- action.hp_node = pc.hp_secondaries[i];
+ SET_OBJ_IP_AND_HOST_PORT(action, node, pc,
secondaries[i]);
}
action.type = config_type::CT_UPGRADE_TO_PRIMARY;
result = pc_status::ill;
@@ -103,26 +100,23 @@ public:
sort_alive_nodes(*view.nodes,
server_load_balancer::primary_comparator(*view.nodes),
sort_result);
- action.node =
dsn::dns_resolver::instance().resolve_address(sort_result[0]);
- action.__set_hp_node(sort_result[0]);
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, sort_result[0]);
action.type = config_type::CT_ASSIGN_PRIMARY;
result = pc_status::ill;
}
// DDD
else {
- action.node = *pc.last_drops.rbegin();
- action.__set_hp_node(*pc.hp_last_drops.rbegin());
+ SET_IP_AND_HOST_PORT(
+ action, node, *pc.last_drops.rbegin(),
*pc.hp_last_drops.rbegin());
action.type = config_type::CT_ASSIGN_PRIMARY;
- LOG_ERROR("{} enters DDD state, we are waiting for its last
primary node {}({}) to "
+ LOG_ERROR("{} enters DDD state, we are waiting for its last
primary node {} to "
"come back ...",
pc.pid,
- action.hp_node,
- action.node);
+ FMT_HOST_PORT_AND_IP(action, node));
result = pc_status::dead;
}
- action.target = action.node;
- action.__set_hp_target(action.hp_node);
+ SET_OBJ_IP_AND_HOST_PORT(action, target, action, node);
}
else if (static_cast<int>(pc.hp_secondaries.size()) + 1 <
pc.max_replica_count) {
@@ -132,13 +126,11 @@ public:
for (auto &node : sort_result) {
if (!is_member(pc, node)) {
- action.node =
dsn::dns_resolver::instance().resolve_address(node);
- action.__set_hp_node(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(action, node, node);
break;
}
}
- action.target = pc.primary;
- action.__set_hp_target(pc.hp_primary);
+ SET_OBJ_IP_AND_HOST_PORT(action, target, pc, primary);
action.type = config_type::CT_ADD_SECONDARY;
result = pc_status::ill;
} else {
diff --git a/src/replica/storage/simple_kv/test/client.cpp
b/src/replica/storage/simple_kv/test/client.cpp
index a8b813b2d..dbafc10d2 100644
--- a/src/replica/storage/simple_kv/test/client.cpp
+++ b/src/replica/storage/simple_kv/test/client.cpp
@@ -158,10 +158,8 @@ void simple_kv_client_app::send_config_to_meta(const
host_port &receiver,
request.gpid = g_default_gpid;
configuration_proposal_action act;
- act.target = dsn::dns_resolver::instance().resolve_address(receiver);
- act.node = dsn::dns_resolver::instance().resolve_address(node);
- act.__set_hp_target(receiver);
- act.__set_hp_node(node);
+ SET_IP_AND_HOST_PORT_BY_DNS(act, node, node);
+ SET_IP_AND_HOST_PORT_BY_DNS(act, target, receiver);
act.__set_type(type);
request.action_list.emplace_back(std::move(act));
request.__set_force(true);
diff --git a/src/replica/test/open_replica_test.cpp
b/src/replica/test/open_replica_test.cpp
index 92cedb0d8..4df92dda5 100644
--- a/src/replica/test/open_replica_test.cpp
+++ b/src/replica/test/open_replica_test.cpp
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <fmt/core.h>
#include <stdint.h>
#include <memory>
#include <string>
@@ -31,6 +32,7 @@
#include "replica/replica_stub.h"
#include "replica_test_base.h"
#include "runtime/rpc/rpc_address.h"
+#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task.h"
#include "utils/filesystem.h"
@@ -66,7 +68,7 @@ TEST_P(open_replica_test,
open_replica_add_decree_and_ballot_check)
gpid pid(ai.app_id, i);
stub->_opening_replicas[pid] = task_ptr(nullptr);
- const auto node = rpc_address::from_ip_port("127.0.0.11", 12321 + i +
1);
+ const auto node = host_port::from_string(fmt::format("127.0.0.11:{}",
12321 + i + 1));
_replica->register_service();
@@ -80,7 +82,7 @@ TEST_P(open_replica_test,
open_replica_add_decree_and_ballot_check)
req->info = *as;
req->config = config;
req->type = config_type::CT_ASSIGN_PRIMARY;
- req->node = node;
+ SET_IP_AND_HOST_PORT_BY_DNS(*req, node, node);
if (test.expect_crash) {
ASSERT_DEATH(stub->open_replica(ai, pid, nullptr, req), "");
} else {
diff --git a/src/runtime/rpc/rpc_host_port.cpp
b/src/runtime/rpc/rpc_host_port.cpp
index 36953bb70..01a047417 100644
--- a/src/runtime/rpc/rpc_host_port.cpp
+++ b/src/runtime/rpc/rpc_host_port.cpp
@@ -216,13 +216,4 @@ error_s
host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
return error_s::ok();
}
-void host_port::fill_host_ports_from_addresses(const std::vector<rpc_address>
&addr_v,
- std::vector<host_port> &hp_v)
-{
- CHECK(hp_v.empty(), "optional host_port should be empty!");
- for (const auto &addr : addr_v) {
- hp_v.emplace_back(host_port::from_address(addr));
- }
-}
-
} // namespace dsn
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index 9e8b7a580..67fae8843 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -51,28 +51,68 @@ class TProtocol;
const auto &_obj = (obj);
\
auto &_target = (target);
\
if (_obj.__isset.hp_##field) {
\
+ DCHECK(_obj.field, "invalid address: {}", _obj.field);
\
+ DCHECK_EQ(_obj.field,
dsn::dns_resolver::instance().resolve_address(_obj.hp_##field)); \
_target = _obj.hp_##field;
\
} else {
\
_target = std::move(dsn::host_port::from_address(_obj.field));
\
}
\
} while (0)
+// Get std::vector<host_port> from 'obj', the result is filled in 'target',
the source is from
+// std::vector<host_port> type field 'hp_<field>' if it is set, otherwise,
reverse resolve from the
+// std::vector<rpc_address> '<field>'.
+#define GET_HOST_PORTS(obj, field, target)
\
+ do {
\
+ const auto &_obj = (obj);
\
+ auto &_target = (target);
\
+ CHECK(_target.empty(), "");
\
+ if (_obj.__isset.hp_##field) {
\
+ DCHECK_EQ(_obj.field.size(), _obj.hp_##field.size());
\
+ _target = _obj.hp_##field;
\
+ } else {
\
+ _target.reserve(_obj.field.size());
\
+ for (const auto &addr : _obj.field) {
\
+ _target.emplace_back(host_port::from_address(addr));
\
+ }
\
+ }
\
+ } while (0)
+
// Set 'addr' and 'hp' to the '<field>' and optional 'hp_<field>' of 'obj'.
The types of the
// fields are rpc_address and host_port, respectively.
#define SET_IP_AND_HOST_PORT(obj, field, addr, hp)
\
do {
\
auto &_obj = (obj);
\
- _obj.field = (addr);
\
- _obj.__set_hp_##field(hp);
\
+ const auto &_addr = (addr);
\
+ const auto &_hp = (hp);
\
+ DCHECK_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
+ _obj.field = _addr;
\
+ _obj.__set_hp_##field(_hp);
\
} while (0)
// GTest check whether the '<field>' and 'hp_<field>' of 'obj' equal to 'addr'
and 'hp'. The types
// of the fields are rpc_address and host_port, respectively.
#define ASSERT_IP_AND_HOST_PORT(obj, field, addr, hp)
\
do {
\
- auto &_obj = (obj);
\
- ASSERT_EQ((addr), _obj.field);
\
- ASSERT_EQ((hp), _obj.hp_##field);
\
+ const auto &_obj = (obj);
\
+ const auto &_addr = (addr);
\
+ const auto &_hp = (hp);
\
+ ASSERT_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
+ ASSERT_EQ(_addr, _obj.field);
\
+ ASSERT_EQ(_hp, _obj.hp_##field);
\
+ } while (0)
+
+// Set '<src_field>' and 'hp_<src_field>' of 'src_obj' to the '<dst_field>'
and optional
+// 'hp_<dst_field>' of 'dst_obj'. The types of the fields are rpc_address and
host_port,
+// respectively.
+#define SET_OBJ_IP_AND_HOST_PORT(dst_obj, dst_field, src_obj, src_field)
\
+ do {
\
+ const auto &_src_obj = (src_obj);
\
+ auto &_dst_obj = (dst_obj);
\
+ DCHECK_EQ(_src_obj.src_field,
\
+
dsn::dns_resolver::instance().resolve_address(_src_obj.hp_##src_field));
\
+ _dst_obj.dst_field = _src_obj.src_field;
\
+ _dst_obj.__set_hp_##dst_field(_src_obj.hp_##src_field);
\
} while (0)
// Set 'hp' and its DNS resolved rpc_address to the optional 'hp_<field>' and
'<field>' of 'obj'.
@@ -107,12 +147,15 @@ class TProtocol;
// of the fields are std::vector<rpc_address> and std::vector<host_port>,
respectively.
#define ADD_IP_AND_HOST_PORT(obj, field, addr, hp)
\
do {
\
+ const auto &_addr = (addr);
\
+ const auto &_hp = (hp);
\
+ DCHECK_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
auto &_obj = (obj);
\
- _obj.field.push_back(addr);
\
+ _obj.field.push_back(_addr);
\
if (!_obj.__isset.hp_##field) {
\
- _obj.__set_hp_##field({hp});
\
+ _obj.__set_hp_##field({_hp});
\
} else {
\
- _obj.hp_##field.push_back(hp);
\
+ _obj.hp_##field.push_back(_hp);
\
}
\
} while (0)
@@ -122,27 +165,28 @@ class TProtocol;
#define ADD_IP_AND_HOST_PORT_BY_DNS(obj, field, hp)
\
do {
\
auto &_obj = (obj);
\
- auto &_hp = (hp);
\
+ const auto &_hp = (hp);
\
_obj.field.push_back(dsn::dns_resolver::instance().resolve_address(_hp));
\
if (!_obj.__isset.hp_##field) {
\
_obj.__set_hp_##field({_hp});
\
} else {
\
_obj.hp_##field.push_back(_hp);
\
}
\
+ DCHECK_EQ(_obj.field.size(), _obj.hp_##field.size());
\
} while (0)
#define SET_IPS_AND_HOST_PORTS_BY_DNS_1(obj, field, hp1)
\
do {
\
auto &_obj = (obj);
\
- auto &_hp1 = (hp1);
\
+ const auto &_hp1 = (hp1);
\
_obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1)};
\
_obj.__set_hp_##field({_hp1});
\
} while (0)
#define SET_IPS_AND_HOST_PORTS_BY_DNS_2(obj, field, hp1, hp2)
\
do {
\
auto &_obj = (obj);
\
- auto &_hp1 = (hp1);
\
- auto &_hp2 = (hp2);
\
+ const auto &_hp1 = (hp1);
\
+ const auto &_hp2 = (hp2);
\
_obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1),
\
dsn::dns_resolver::instance().resolve_address(_hp2)};
\
_obj.__set_hp_##field({_hp1, _hp2});
\
@@ -150,9 +194,9 @@ class TProtocol;
#define SET_IPS_AND_HOST_PORTS_BY_DNS_3(obj, field, hp1, hp2, hp3)
\
do {
\
auto &_obj = (obj);
\
- auto &_hp1 = (hp1);
\
- auto &_hp2 = (hp2);
\
- auto &_hp3 = (hp3);
\
+ const auto &_hp1 = (hp1);
\
+ const auto &_hp2 = (hp2);
\
+ const auto &_hp3 = (hp3);
\
_obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1),
\
dsn::dns_resolver::instance().resolve_address(_hp2),
\
dsn::dns_resolver::instance().resolve_address(_hp3)};
\
@@ -178,13 +222,14 @@ class TProtocol;
#define HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS(obj, field, hp)
\
do {
\
auto &_obj = (obj);
\
- auto &_hp = (hp);
\
+ const auto &_hp = (hp);
\
_obj.field.insert(_obj.field.begin(),
dsn::dns_resolver::instance().resolve_address(_hp)); \
if (!_obj.__isset.hp_##field) {
\
_obj.__set_hp_##field({_hp});
\
} else {
\
_obj.hp_##field.insert(_obj.hp_##field.begin(), _hp);
\
}
\
+ DCHECK_EQ(_obj.field.size(), _obj.hp_##field.size());
\
} while (0)
// TODO(yingchun): the 'hp' can be reduced.
@@ -192,13 +237,17 @@ class TProtocol;
// maps are rpc_address and host_port type and indexed by 'addr' and 'hp',
respectively.
#define SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value)
\
do {
\
+ const auto &_hp = (hp);
\
+ const auto &_addr = (addr);
\
+ DCHECK_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
auto &_obj = (obj);
\
const auto &_value = (value);
\
- _obj.field[addr] = _value;
\
+ _obj.field[_addr] = _value;
\
if (!_obj.__isset.hp_##field) {
\
_obj.__set_hp_##field({});
\
}
\
- _obj.hp_##field[hp] = _value;
\
+ _obj.hp_##field[_hp] = _value;
\
+ DCHECK_EQ(_obj.field.size(), _obj.hp_##field.size());
\
} while (0)
// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'.
The key of the
@@ -206,9 +255,9 @@ class TProtocol;
// 'addr', respectively.
#define SET_VALUE_FROM_HOST_PORT(obj, field, hp, value)
\
do {
\
- const auto &_hp = (hp);
\
- const auto addr = dsn::dns_resolver::instance().resolve_address(_hp);
\
- SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, _hp, value);
\
+ const auto &__hp = (hp);
\
+ const auto addr = dsn::dns_resolver::instance().resolve_address(__hp);
\
+ SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, __hp, value);
\
} while (0)
#define FMT_HOST_PORT_AND_IP(obj, field) fmt::format("{}({})",
(obj).hp_##field, (obj).field)
@@ -261,9 +310,6 @@ public:
uint32_t read(::apache::thrift::protocol::TProtocol *iprot);
uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const;
- static void fill_host_ports_from_addresses(const std::vector<rpc_address>
&addr_v,
- /*output*/
std::vector<host_port> &hp_v);
-
private:
friend class dns_resolver;
friend class rpc_group_host_port;
diff --git a/src/runtime/test/host_port_test.cpp
b/src/runtime/test/host_port_test.cpp
index f630a5aa5..a6e7334cf 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -28,6 +28,7 @@
#include "common/serialization_helper/dsn.layer2_types.h"
#include "fd_types.h"
#include "gtest/gtest.h"
+#include "meta_admin_types.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
@@ -266,9 +267,11 @@ TEST(host_port_test, test_macros)
static const host_port kHp1("localhost", 8081);
static const host_port kHp2("localhost", 8082);
static const host_port kHp3("localhost", 8083);
+ static const std::vector<host_port> kHps({kHp1, kHp2, kHp3});
static const rpc_address kAddr1 =
dns_resolver::instance().resolve_address(kHp1);
static const rpc_address kAddr2 =
dns_resolver::instance().resolve_address(kHp2);
static const rpc_address kAddr3 =
dns_resolver::instance().resolve_address(kHp3);
+ static const std::vector<rpc_address> kAddres({kAddr1, kAddr2, kAddr3});
// Test GET_HOST_PORT-1.
{
@@ -291,6 +294,7 @@ TEST(host_port_test, test_macros)
{
fd::beacon_msg beacon;
host_port hp_from_node;
+ beacon.from_node = kAddr1;
beacon.__set_hp_from_node(kHp1);
GET_HOST_PORT(beacon, from_node, hp_from_node);
ASSERT_TRUE(hp_from_node);
@@ -298,6 +302,31 @@ TEST(host_port_test, test_macros)
ASSERT_EQ(kAddr1,
dns_resolver::instance().resolve_address(hp_from_node));
}
+ // Test GET_HOST_PORTS-1.
+ {
+ replication::configuration_recovery_request req;
+ std::vector<host_port> recovery_nodes;
+ GET_HOST_PORTS(req, recovery_nodes, recovery_nodes);
+ ASSERT_TRUE(recovery_nodes.empty());
+ }
+ // Test GET_HOST_PORTS-2.
+ {
+ replication::configuration_recovery_request req;
+ req.__set_recovery_nodes(kAddres);
+ std::vector<host_port> recovery_nodes;
+ GET_HOST_PORTS(req, recovery_nodes, recovery_nodes);
+ ASSERT_EQ(kHps, recovery_nodes);
+ }
+ // Test GET_HOST_PORTS-2.
+ {
+ replication::configuration_recovery_request req;
+ req.__set_recovery_nodes(kAddres);
+ req.__set_hp_recovery_nodes(kHps);
+ std::vector<host_port> recovery_nodes;
+ GET_HOST_PORTS(req, recovery_nodes, recovery_nodes);
+ ASSERT_EQ(kHps, recovery_nodes);
+ }
+
// Test SET_IP_AND_HOST_PORT.
{
fd::beacon_msg beacon;
diff --git a/src/shell/commands/recovery.cpp b/src/shell/commands/recovery.cpp
index fd8b7b826..b7c8ce106 100644
--- a/src/shell/commands/recovery.cpp
+++ b/src/shell/commands/recovery.cpp
@@ -181,8 +181,9 @@ dsn::host_port diagnose_recommend(const ddd_partition_info
&pinfo)
if (last_dropped.size() == 1) {
const ddd_node_info &ninfo = last_dropped.back();
- if (ninfo.last_committed_decree >= pinfo.config.last_committed_decree)
+ if (ninfo.last_committed_decree >= pinfo.config.last_committed_decree)
{
return ninfo.hp_node;
+ }
} else if (last_dropped.size() == 2) {
const ddd_node_info &secondary = last_dropped.front();
const ddd_node_info &latest = last_dropped.back();
@@ -192,16 +193,19 @@ dsn::host_port diagnose_recommend(const
ddd_partition_info &pinfo)
// - if last committed decree is the same, choose node with the
largest ballot
if (latest.last_committed_decree == secondary.last_committed_decree &&
- latest.last_committed_decree >= pinfo.config.last_committed_decree)
+ latest.last_committed_decree >=
pinfo.config.last_committed_decree) {
return latest.ballot >= secondary.ballot ? latest.hp_node :
secondary.hp_node;
+ }
if (latest.last_committed_decree > secondary.last_committed_decree &&
- latest.last_committed_decree >= pinfo.config.last_committed_decree)
+ latest.last_committed_decree >=
pinfo.config.last_committed_decree) {
return latest.hp_node;
+ }
if (secondary.last_committed_decree > latest.last_committed_decree &&
- secondary.last_committed_decree >=
pinfo.config.last_committed_decree)
+ secondary.last_committed_decree >=
pinfo.config.last_committed_decree) {
return secondary.hp_node;
+ }
}
return dsn::host_port();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]