This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a684b43d9 refactor(FQDN): Add some convenient macros and update
idl/dsn.layer2.thrift related code (#1977)
a684b43d9 is described below
commit a684b43d99626667b9725833a4c102d9d8dd3805
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 22 11:10:33 2024 +0800
refactor(FQDN): Add some convenient macros and update idl/dsn.layer2.thrift
related code (#1977)
- Add new convenient macros:
- SET_IP_AND_HOST_PORT_BY_DNS
- RESET_IP_AND_HOST_PORT
- CLEAR_IP_AND_HOST_PORT
- ADD_IP_AND_HOST_PORT
- ADD_IP_AND_HOST_PORT_BY_DNS
- SET_IPS_AND_HOST_PORTS_BY_DNS
- HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS
- To ensure every references of rpc_address fields in idl/dsn.layer2.thrift
are get/set together with the related host_port fields, I use the
following
steps:
- Rename the rpc_address and host_port field names (e.g. append a `1`
character) in idl/dsn.layer2.thrift
- Update the C++ code to make sure the project can be built succeed
- Find out all the changes, and use the macros to set/get the rpc_address
and host_port fields.
- Revert the renaming
- Build and test the project
---
src/client/partition_resolver_simple.cpp | 5 +-
src/meta/cluster_balance_policy.cpp | 7 +-
src/meta/duplication/meta_duplication_service.cpp | 13 +-
src/meta/meta_data.cpp | 18 +--
src/meta/meta_service.cpp | 3 +-
src/meta/partition_guardian.cpp | 10 +-
src/meta/server_state.cpp | 15 +-
src/meta/test/backup_test.cpp | 8 +-
.../test/balancer_simulator/balancer_simulator.cpp | 4 +-
src/meta/test/balancer_validator.cpp | 26 ++--
src/meta/test/cluster_balance_policy_test.cpp | 10 +-
src/meta/test/ford_fulkerson_test.cpp | 10 +-
src/meta/test/meta_bulk_load_ingestion_test.cpp | 6 +-
src/meta/test/meta_bulk_load_service_test.cpp | 37 ++---
src/meta/test/meta_data.cpp | 27 ++--
src/meta/test/meta_duplication_service_test.cpp | 16 +-
src/meta/test/meta_partition_guardian_test.cpp | 107 +++++--------
src/meta/test/misc/misc.cpp | 31 ++--
src/meta/test/state_sync_test.cpp | 20 +--
src/meta/test/update_configuration_test.cpp | 52 ++-----
src/replica/bulk_load/replica_bulk_loader.cpp | 22 ++-
.../bulk_load/test/replica_bulk_loader_test.cpp | 9 +-
src/replica/duplication/replica_follower.cpp | 5 +-
src/replica/duplication/replica_follower.h | 6 +-
.../duplication/test/replica_follower_test.cpp | 12 +-
src/replica/replica_config.cpp | 26 +---
src/replica/replica_stub.cpp | 3 +-
src/replica/split/replica_split_manager.cpp | 3 +-
src/replica/split/test/replica_split_test.cpp | 12 +-
src/runtime/rpc/rpc_host_port.h | 132 +++++++++++++++-
src/runtime/test/host_port_test.cpp | 166 +++++++++++++++++++++
src/server/hotspot_partition_calculator.cpp | 5 +-
32 files changed, 472 insertions(+), 354 deletions(-)
diff --git a/src/client/partition_resolver_simple.cpp
b/src/client/partition_resolver_simple.cpp
index ec70e764a..d57742563 100644
--- a/src/client/partition_resolver_simple.cpp
+++ b/src/client/partition_resolver_simple.cpp
@@ -304,11 +304,10 @@ void
partition_resolver_simple::query_config_reply(error_code err,
for (auto it = resp.partitions.begin(); it !=
resp.partitions.end(); ++it) {
auto &new_config = *it;
- LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {},
primary = {}({})",
+ LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {},
primary = {}",
new_config.pid,
new_config.ballot,
- new_config.hp_primary,
- new_config.primary);
+ FMT_HOST_PORT_AND_IP(new_config, primary));
auto it2 =
_config_cache.find(new_config.pid.get_partition_index());
if (it2 == _config_cache.end()) {
diff --git a/src/meta/cluster_balance_policy.cpp
b/src/meta/cluster_balance_policy.cpp
index 63c0377e3..516d78223 100644
--- a/src/meta/cluster_balance_policy.cpp
+++ b/src/meta/cluster_balance_policy.cpp
@@ -26,6 +26,9 @@
#include "dsn.layer2_types.h"
#include "meta/load_balance_policy.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
+#include "runtime/rpc/rpc_address.h"
+#include "runtime/rpc/rpc_host_port.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/utils.h"
@@ -542,10 +545,10 @@ bool cluster_balance_policy::apply_move(const move_info
&move,
it->second.erase(move.pid);
node_target.future_partitions.insert(move.pid);
- // add into migration list and selected_pid
+ // add into the migration list and selected_pid
partition_configuration pc;
pc.pid = move.pid;
- pc.hp_primary = primary_hp;
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, primary_hp);
list[move.pid] = generate_balancer_request(*_global_view->apps, pc,
move.type, source, target);
_migration_result->emplace(
move.pid, generate_balancer_request(*_global_view->apps, pc,
move.type, source, target));
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index df5c4a548..6c4d1a1dd 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -518,17 +518,8 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
const host_port secondary2("localhost", 34803);
partition_configuration p;
- p.primary =
dsn::dns_resolver::instance().resolve_address(primary);
- p.secondaries.emplace_back(
-
dsn::dns_resolver::instance().resolve_address(secondary1));
- p.secondaries.emplace_back(
-
dsn::dns_resolver::instance().resolve_address(secondary2));
-
- p.__set_hp_primary(primary);
- p.__set_hp_secondaries({});
- p.hp_secondaries.emplace_back(secondary1);
- p.hp_secondaries.emplace_back(secondary2);
-
+ SET_IP_AND_HOST_PORT_BY_DNS(p, primary, primary);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(p, secondaries,
secondary1, secondary2);
resp.partitions.emplace_back(p);
}
});
diff --git a/src/meta/meta_data.cpp b/src/meta/meta_data.cpp
index 610bd1898..029cca0e2 100644
--- a/src/meta/meta_data.cpp
+++ b/src/meta/meta_data.cpp
@@ -31,7 +31,7 @@
#include "common/replication_enums.h"
#include "meta_data.h"
#include "runtime/api_layer1.h"
-#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_message.h"
#include "utils/flags.h"
@@ -114,8 +114,7 @@ bool construct_replica(meta_view view, const gpid &pid, int
max_replica_count)
invalid_ballot,
"the ballot of server must not be invalid_ballot, node = {}",
server.node);
- pc.primary = dsn::dns_resolver::instance().resolve_address(server.node);
- pc.__set_hp_primary(server.node);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, server.node);
pc.ballot = server.ballot;
pc.partition_flags = 0;
pc.max_replica_count = max_replica_count;
@@ -138,9 +137,7 @@ bool construct_replica(meta_view view, const gpid &pid, int
max_replica_count)
if (pc.hp_last_drops.size() + 1 >= max_replica_count)
break;
// similar to cc.drop_list, pc.last_drop is also a stack structure
- pc.last_drops.insert(pc.last_drops.begin(),
-
dsn::dns_resolver::instance().resolve_address(iter->node));
- pc.hp_last_drops.insert(pc.hp_last_drops.begin(), iter->node);
+ HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS(pc, last_drops, iter->node);
LOG_INFO("construct for ({}), select {} into last_drops, ballot({}), "
"committed_decree({}), prepare_decree({})",
pid,
@@ -529,14 +526,11 @@ app_state::app_state(const app_info &info) :
app_info(info), helpers(new app_sta
config.ballot = 0;
config.pid.set_app_id(app_id);
config.last_committed_decree = 0;
- config.last_drops.clear();
config.max_replica_count = app_info::max_replica_count;
- config.primary.set_invalid();
- config.secondaries.clear();
- config.__set_hp_primary(host_port());
- config.__set_hp_secondaries({});
- config.__set_hp_last_drops({});
+ RESET_IP_AND_HOST_PORT(config, primary);
+ CLEAR_IP_AND_HOST_PORT(config, secondaries);
+ CLEAR_IP_AND_HOST_PORT(config, last_drops);
partitions.assign(app_info::partition_count, config);
for (int i = 0; i != app_info::partition_count; ++i)
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 5e296ac17..00ca0882a 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -759,8 +759,7 @@ void
meta_service::on_query_configuration_by_index(configuration_query_by_index_
if (!check_status_and_authz(rpc, &forward_hp)) {
if (forward_hp) {
partition_configuration config;
- config.primary =
dsn::dns_resolver::instance().resolve_address(forward_hp);
- config.__set_hp_primary(forward_hp);
+ SET_IP_AND_HOST_PORT_BY_DNS(config, primary, forward_hp);
response.partitions.push_back(std::move(config));
}
return;
diff --git a/src/meta/partition_guardian.cpp b/src/meta/partition_guardian.cpp
index 9e4d6a135..6ecbda5c5 100644
--- a/src/meta/partition_guardian.cpp
+++ b/src/meta/partition_guardian.cpp
@@ -347,19 +347,17 @@ pc_status
partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
break;
}
}
- LOG_INFO("{}: config_context.last_drops[{}({})]: node({}),
dropped_index({})",
+ LOG_INFO("{}: config_context.last_drops[{}]: node({}),
dropped_index({})",
gpid_name,
i,
- pc.hp_last_drops[i],
- pc.last_drops[i],
+ FMT_HOST_PORT_AND_IP(pc, last_drops[i]),
dropped_index);
}
if (pc.hp_last_drops.size() == 1) {
- LOG_WARNING("{}: the only node({}({})) is dead, waiting it to come
back",
+ LOG_WARNING("{}: the only node({}) is dead, waiting it to come
back",
gpid_name,
- pc.hp_last_drops.back(),
- pc.last_drops.back());
+ FMT_HOST_PORT_AND_IP(pc, last_drops.back()));
action.hp_node = pc.hp_last_drops.back();
action.node = pc.last_drops.back();
} else {
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index fd2701a9c..6e3924e89 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -570,6 +570,7 @@ dsn::error_code
server_state::sync_apps_from_remote_storage()
const blob &value)
mutable {
if (ec == ERR_OK) {
partition_configuration pc;
+ // TODO(yingchun): check if the fields will be set after
decoding.
pc.__isset.hp_secondaries = true;
pc.__isset.hp_last_drops = true;
pc.__isset.hp_primary = true;
@@ -1827,10 +1828,8 @@ void
server_state::drop_partition(std::shared_ptr<app_state> &app, int pidx)
if (pc.primary) {
maintain_drops(request.config.last_drops, pc.primary, request.type);
}
- request.config.primary.set_invalid();
- request.config.secondaries.clear();
- request.config.hp_primary.reset();
- request.config.hp_secondaries.clear();
+ RESET_IP_AND_HOST_PORT(request.config, primary);
+ CLEAR_IP_AND_HOST_PORT(request.config, secondaries);
CHECK_EQ((pc.partition_flags & pc_flags::dropped), 0);
request.config.partition_flags |= pc_flags::dropped;
@@ -1873,10 +1872,9 @@ void
server_state::downgrade_primary_to_inactive(std::shared_ptr<app_state> &app
return;
} else {
LOG_WARNING("gpid({}) is syncing another request with remote,
cancel it due to the "
- "primary({}({})) is down",
+ "primary({}) is down",
pc.pid,
- pc.hp_primary,
- pc.primary);
+ FMT_HOST_PORT_AND_IP(pc, primary));
cc.cancel_sync();
}
}
@@ -1890,8 +1888,7 @@ void
server_state::downgrade_primary_to_inactive(std::shared_ptr<app_state> &app
request.node = pc.primary;
request.__set_hp_node(pc.hp_primary);
request.config.ballot++;
- request.config.primary.set_invalid();
- request.config.__set_hp_primary(host_port());
+ RESET_IP_AND_HOST_PORT(request.config, primary);
maintain_drops(request.config.hp_last_drops, pc.hp_primary, request.type);
maintain_drops(request.config.last_drops, pc.primary, request.type);
diff --git a/src/meta/test/backup_test.cpp b/src/meta/test/backup_test.cpp
index 0ea2e4c75..f436514b6 100644
--- a/src/meta/test/backup_test.cpp
+++ b/src/meta/test/backup_test.cpp
@@ -43,7 +43,6 @@
#include "meta_service_test_app.h"
#include "meta_test_base.h"
#include "runtime/api_layer1.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/rpc/rpc_host_port.h"
@@ -506,11 +505,8 @@ TEST_F(policy_context_test, test_app_dropped_during_backup)
app_state *app = state->_all_apps[3].get();
app->status = dsn::app_status::AS_AVAILABLE;
for (partition_configuration &pc : app->partitions) {
- pc.primary =
dsn::dns_resolver::instance().resolve_address(node_list[0]);
- pc.secondaries =
{dsn::dns_resolver::instance().resolve_address(node_list[1]),
-
dsn::dns_resolver::instance().resolve_address(node_list[2])};
- pc.__set_hp_primary(node_list[0]);
- pc.__set_hp_secondaries({node_list[1], node_list[2]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, node_list[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, node_list[1],
node_list[2]);
}
_mp._backup_history.clear();
diff --git a/src/meta/test/balancer_simulator/balancer_simulator.cpp
b/src/meta/test/balancer_simulator/balancer_simulator.cpp
index a70e8a14d..16b50407e 100644
--- a/src/meta/test/balancer_simulator/balancer_simulator.cpp
+++ b/src/meta/test/balancer_simulator/balancer_simulator.cpp
@@ -42,6 +42,8 @@
#include "meta/test/misc/misc.h"
#include "meta_admin_types.h"
#include "runtime/app_model.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
+#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/fmt_logging.h"
@@ -100,7 +102,7 @@ void generate_balanced_apps(/*out*/ app_mapper &apps,
for (dsn::partition_configuration &pc : the_app->partitions) {
const auto &n = pq1.pop();
nodes[n].put_partition(pc.pid, true);
- pc.hp_primary = n;
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, n);
pq1.push(n);
}
diff --git a/src/meta/test/balancer_validator.cpp
b/src/meta/test/balancer_validator.cpp
index a29675139..9a9d0599a 100644
--- a/src/meta/test/balancer_validator.cpp
+++ b/src/meta/test/balancer_validator.cpp
@@ -75,15 +75,14 @@ static void check_cure(app_mapper &apps, node_mapper
&nodes, ::dsn::partition_co
CHECK_EQ(nodes[act.hp_node].served_as(pc.pid),
partition_status::PS_INACTIVE);
nodes[act.hp_node].put_partition(pc.pid, true);
- pc.primary = act.node;
- pc.hp_primary = act.hp_node;
+ SET_IP_AND_HOST_PORT(pc, primary, act.node, act.hp_node);
break;
case config_type::CT_ADD_SECONDARY:
CHECK(!is_member(pc, act.hp_node), "");
CHECK_EQ(pc.hp_primary, act.hp_target);
CHECK(nodes.find(act.hp_node) != nodes.end(), "");
- pc.hp_secondaries.push_back(act.hp_node);
+ ADD_IP_AND_HOST_PORT(pc, secondaries, act.node, act.hp_node);
ns = &nodes[act.hp_node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, false);
@@ -98,8 +97,7 @@ static void check_cure(app_mapper &apps, node_mapper &nodes,
::dsn::partition_co
// test upgrade to primary
CHECK_EQ(nodes[pc.hp_primary].served_as(pc.pid),
partition_status::PS_PRIMARY);
nodes[pc.hp_primary].remove_partition(pc.pid, true);
- pc.primary.set_invalid();
- pc.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(pc, primary);
ps = guardian.cure({&apps, &nodes}, pc.pid, act);
CHECK_EQ(act.type, config_type::CT_UPGRADE_TO_PRIMARY);
@@ -109,8 +107,7 @@ static void check_cure(app_mapper &apps, node_mapper
&nodes, ::dsn::partition_co
CHECK(nodes.find(act.hp_node) != nodes.end(), "");
ns = &nodes[act.hp_node];
- pc.primary = act.node;
- pc.__set_hp_primary(act.hp_node);
+ SET_IP_AND_HOST_PORT(pc, primary, act.node, act.hp_node);
std::remove(pc.secondaries.begin(), pc.secondaries.end(), pc.primary);
std::remove(pc.hp_secondaries.begin(), pc.hp_secondaries.end(),
pc.hp_primary);
@@ -168,12 +165,11 @@ void meta_service_test_app::balancer_validator()
// now test the cure
::dsn::partition_configuration &pc = the_app->partitions[0];
nodes[pc.hp_primary].remove_partition(pc.pid, false);
- for (const auto &hp : pc.hp_secondaries)
+ for (const auto &hp : pc.hp_secondaries) {
nodes[hp].remove_partition(pc.pid, false);
- pc.primary.set_invalid();
- pc.secondaries.clear();
- pc.hp_primary.reset();
- pc.hp_secondaries.clear();
+ }
+ RESET_IP_AND_HOST_PORT(pc, primary);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
// cure test
check_cure(apps, nodes, pc);
@@ -212,10 +208,12 @@ static void load_apps_and_nodes(const char *file,
app_mapper &apps, node_mapper
int n;
infile >> n;
infile >> ip_port;
- app->partitions[j].hp_primary = host_port::from_string(ip_port);
+ const auto primary = host_port::from_string(ip_port);
+ SET_IP_AND_HOST_PORT_BY_DNS(app->partitions[j], primary, primary);
for (int k = 1; k < n; ++k) {
infile >> ip_port;
-
app->partitions[j].hp_secondaries.push_back(host_port::from_string(ip_port));
+ const auto secondary = host_port::from_string(ip_port);
+ ADD_IP_AND_HOST_PORT_BY_DNS(app->partitions[j], secondaries,
secondary);
}
}
}
diff --git a/src/meta/test/cluster_balance_policy_test.cpp
b/src/meta/test/cluster_balance_policy_test.cpp
index 8b4b8f6c7..14ae55632 100644
--- a/src/meta/test/cluster_balance_policy_test.cpp
+++ b/src/meta/test/cluster_balance_policy_test.cpp
@@ -36,6 +36,7 @@
#include "meta/meta_service.h"
#include "meta_admin_types.h"
#include "metadata_types.h"
+#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/defer.h"
#include "utils/fail_point.h"
@@ -118,7 +119,7 @@ TEST(cluster_balance_policy, get_app_migration_info)
info.app_name = appname;
info.partition_count = 1;
auto app = std::make_shared<app_state>(info);
- app->partitions[0].hp_primary = hp;
+ SET_IP_AND_HOST_PORT_BY_DNS(app->partitions[0], primary, hp);
node_state ns;
ns.set_hp(hp);
@@ -161,7 +162,7 @@ TEST(cluster_balance_policy, get_node_migration_info)
info.app_name = appname;
info.partition_count = 1;
auto app = std::make_shared<app_state>(info);
- app->partitions[0].hp_primary = hp;
+ SET_IP_AND_HOST_PORT_BY_DNS(app->partitions[0], primary, hp);
serving_replica sr;
sr.node = hp;
std::string disk_tag = "disk1";
@@ -514,9 +515,8 @@ TEST(cluster_balance_policy, calc_potential_moving)
info.partition_count = 4;
std::shared_ptr<app_state> app = app_state::create(info);
partition_configuration pc;
- pc.hp_primary = hp1;
- pc.hp_secondaries.push_back(hp2);
- pc.hp_secondaries.push_back(hp3);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, hp1);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, hp2, hp3);
app->partitions[0] = pc;
app->partitions[1] = pc;
diff --git a/src/meta/test/ford_fulkerson_test.cpp
b/src/meta/test/ford_fulkerson_test.cpp
index 1d3084e3f..bb291c8bc 100644
--- a/src/meta/test/ford_fulkerson_test.cpp
+++ b/src/meta/test/ford_fulkerson_test.cpp
@@ -27,6 +27,8 @@
#include "gtest/gtest.h"
#include "meta/load_balance_policy.h"
#include "meta/meta_data.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
+#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
namespace dsn {
@@ -96,8 +98,7 @@ TEST(ford_fulkerson, update_decree)
info.partition_count = 1;
std::shared_ptr<app_state> app = app_state::create(info);
partition_configuration pc;
- pc.hp_secondaries.push_back(hp2);
- pc.hp_secondaries.push_back(hp3);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, hp2, hp3);
app->partitions.push_back(pc);
app->partitions.push_back(pc);
@@ -134,9 +135,8 @@ TEST(ford_fulkerson, find_shortest_path)
std::shared_ptr<app_state> app = app_state::create(info);
partition_configuration pc;
- pc.hp_primary = hp1;
- pc.hp_secondaries.push_back(hp2);
- pc.hp_secondaries.push_back(hp3);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, hp1);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, hp2, hp3);
app->partitions[0] = pc;
app->partitions[1] = pc;
diff --git a/src/meta/test/meta_bulk_load_ingestion_test.cpp
b/src/meta/test/meta_bulk_load_ingestion_test.cpp
index daed55067..8d5e434dc 100644
--- a/src/meta/test/meta_bulk_load_ingestion_test.cpp
+++ b/src/meta/test/meta_bulk_load_ingestion_test.cpp
@@ -27,6 +27,7 @@
#include "meta/meta_bulk_load_ingestion_context.h"
#include "meta/meta_data.h"
#include "meta_test_base.h"
+#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/fail_point.h"
@@ -229,9 +230,8 @@ public:
config_context &cc)
{
config.pid = gpid(APP_ID, pidx);
- config.hp_primary = nodes[0];
- config.hp_secondaries.emplace_back(nodes[1]);
- config.hp_secondaries.emplace_back(nodes[2]);
+ SET_IP_AND_HOST_PORT_BY_DNS(config, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(config, secondaries, nodes[1], nodes[2]);
auto count = nodes.size();
for (auto i = 0; i < count; i++) {
diff --git a/src/meta/test/meta_bulk_load_service_test.cpp
b/src/meta/test/meta_bulk_load_service_test.cpp
index ad61b6cd8..2cd4e3c64 100644
--- a/src/meta/test/meta_bulk_load_service_test.cpp
+++ b/src/meta/test/meta_bulk_load_service_test.cpp
@@ -178,13 +178,8 @@ public:
config.pid = gpid(app->app_id, 0);
config.max_replica_count = 3;
config.ballot = BALLOT;
- config.primary = PRIMARY;
- config.secondaries.emplace_back(SECONDARY1);
- config.secondaries.emplace_back(SECONDARY2);
- config.hp_primary = PRIMARY_HP;
- config.__set_hp_secondaries(std::vector<host_port>());
- config.hp_secondaries.emplace_back(SECONDARY1_HP);
- config.hp_secondaries.emplace_back(SECONDARY2_HP);
+ SET_IP_AND_HOST_PORT_BY_DNS(config, primary, PRIMARY_HP);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(config, secondaries, SECONDARY1_HP,
SECONDARY2_HP);
app->partitions.clear();
app->partitions.emplace_back(config);
mock_meta_bulk_load_context(app->app_id, app->partition_count, status);
@@ -199,12 +194,10 @@ public:
{
std::shared_ptr<app_state> app = find_app(name);
if (mock_primary_invalid) {
- app->partitions[pid.get_partition_index()].primary.set_invalid();
- app->partitions[pid.get_partition_index()].hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(app->partitions[pid.get_partition_index()],
primary);
}
if (mock_lack_secondary) {
- app->partitions[pid.get_partition_index()].secondaries.clear();
- app->partitions[pid.get_partition_index()].hp_secondaries.clear();
+ CLEAR_IP_AND_HOST_PORT(app->partitions[pid.get_partition_index()],
secondaries);
}
partition_configuration pconfig;
bool flag = bulk_svc().check_partition_status(
@@ -242,25 +235,17 @@ public:
set_partition_bulk_load_info(pid, ever_ingest_succeed);
partition_configuration config;
config.pid = pid;
- config.primary = PRIMARY;
- config.__set_hp_primary(PRIMARY_HP);
- config.__set_hp_secondaries(std::vector<host_port>());
+ SET_IP_AND_HOST_PORT_BY_DNS(config, primary, PRIMARY_HP);
if (same) {
- config.secondaries.emplace_back(SECONDARY1);
- config.secondaries.emplace_back(SECONDARY2);
- config.hp_secondaries.emplace_back(SECONDARY1_HP);
- config.hp_secondaries.emplace_back(SECONDARY2_HP);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries, SECONDARY1_HP);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries, SECONDARY2_HP);
} else {
- config.secondaries.emplace_back(SECONDARY1);
- config.hp_secondaries.emplace_back(SECONDARY1_HP);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries, SECONDARY1_HP);
if (secondary_count == 2) {
- config.secondaries.emplace_back(SECONDARY3);
- config.hp_secondaries.emplace_back(SECONDARY3_HP);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries,
SECONDARY3_HP);
} else if (secondary_count >= 3) {
- config.secondaries.emplace_back(SECONDARY2);
- config.secondaries.emplace_back(SECONDARY3);
- config.hp_secondaries.emplace_back(SECONDARY2_HP);
- config.hp_secondaries.emplace_back(SECONDARY3_HP);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries,
SECONDARY2_HP);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries,
SECONDARY3_HP);
}
}
auto flag = bulk_svc().check_ever_ingestion_succeed(config, APP_NAME,
pid);
diff --git a/src/meta/test/meta_data.cpp b/src/meta/test/meta_data.cpp
index 2f28bd793..c32f33e86 100644
--- a/src/meta/test/meta_data.cpp
+++ b/src/meta/test/meta_data.cpp
@@ -35,7 +35,7 @@
#include "meta/meta_data.h"
#include "metadata_types.h"
#include "misc/misc.h"
-#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
@@ -136,12 +136,9 @@ TEST(meta_data, collect_replica)
#define CLEAR_REPLICA
\
do {
\
- pc.__set_hp_primary(dsn::host_port());
\
- pc.__set_hp_secondaries({});
\
- pc.__set_hp_last_drops({});
\
- pc.primary.set_invalid();
\
- pc.secondaries.clear();
\
- pc.last_drops.clear();
\
+ RESET_IP_AND_HOST_PORT(pc, primary);
\
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
\
+ CLEAR_IP_AND_HOST_PORT(pc, last_drops);
\
} while (false)
#define CLEAR_DROP_LIST
\
@@ -153,22 +150,19 @@ TEST(meta_data, collect_replica)
CLEAR_REPLICA;
\
CLEAR_DROP_LIST
- const auto addr =
dsn::dns_resolver::instance().resolve_address(node_list[0]);
{
// replica is primary of partition
CLEAR_ALL;
rep.ballot = 10;
pc.ballot = 9;
- pc.primary = addr;
- pc.__set_hp_primary(node_list[0]);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, node_list[0]);
ASSERT_TRUE(collect_replica(view, node_list[0], rep));
}
{
// replica is secondary of partition
CLEAR_ALL;
- pc.secondaries.push_back(addr);
- pc.hp_secondaries.push_back(node_list[0]);
+ ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries, node_list[0]);
ASSERT_TRUE(collect_replica(view, node_list[0], rep));
}
@@ -385,12 +379,9 @@ TEST(meta_data, construct_replica)
#define CLEAR_REPLICA
\
do {
\
- pc.hp_primary.reset();
\
- pc.hp_secondaries.clear();
\
- pc.hp_last_drops.clear();
\
- pc.__set_hp_primary(dsn::host_port());
\
- pc.__set_hp_secondaries({});
\
- pc.__set_hp_last_drops({});
\
+ RESET_IP_AND_HOST_PORT(pc, primary);
\
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
\
+ CLEAR_IP_AND_HOST_PORT(pc, last_drops);
\
} while (false)
#define CLEAR_DROP_LIST
\
diff --git a/src/meta/test/meta_duplication_service_test.cpp
b/src/meta/test/meta_duplication_service_test.cpp
index 4e6b05656..df0a34d99 100644
--- a/src/meta/test/meta_duplication_service_test.cpp
+++ b/src/meta/test/meta_duplication_service_test.cpp
@@ -587,9 +587,6 @@ TEST_F(meta_duplication_service_test, remove_dup)
TEST_F(meta_duplication_service_test, duplication_sync)
{
const auto &server_nodes = ensure_enough_alive_nodes(3);
- const auto &node = server_nodes[0];
- const auto &addr =
dsn::dns_resolver::instance().resolve_address(server_nodes[0]);
-
const std::string test_app = "test_app_0";
create_app(test_app);
auto app = find_app(test_app);
@@ -597,14 +594,14 @@ TEST_F(meta_duplication_service_test, duplication_sync)
// generate all primaries on node[0]
for (partition_configuration &pc : app->partitions) {
pc.ballot = random32(1, 10000);
- pc.primary = addr;
- pc.__set_hp_primary(server_nodes[0]);
- pc.hp_secondaries.push_back(server_nodes[1]);
- pc.hp_secondaries.push_back(server_nodes[2]);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, server_nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, server_nodes[1],
server_nodes[2]);
}
initialize_node_state();
+ const auto &node = server_nodes[0];
+ const auto &addr =
dsn::dns_resolver::instance().resolve_address(server_nodes[0]);
const dupid_t dupid = create_dup(test_app).dupid;
auto dup = app->duplications[dupid];
for (int i = 0; i < app->partition_count; i++) {
@@ -818,12 +815,11 @@ TEST_F(meta_duplication_service_test, fail_mode)
// ensure dup_sync will synchronize fail_mode
const auto hp = generate_node_list(3)[0];
- const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
for (partition_configuration &pc : app->partitions) {
- pc.primary = addr;
- pc.__set_hp_primary(hp);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, hp);
}
initialize_node_state();
+ const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
auto sync_resp = duplication_sync(addr, hp, {});
ASSERT_TRUE(sync_resp.dup_map[app->app_id][dup->id].__isset.fail_mode);
ASSERT_EQ(sync_resp.dup_map[app->app_id][dup->id].fail_mode,
duplication_fail_mode::FAIL_SKIP);
diff --git a/src/meta/test/meta_partition_guardian_test.cpp
b/src/meta/test/meta_partition_guardian_test.cpp
index e59e768ab..0281d8038 100644
--- a/src/meta/test/meta_partition_guardian_test.cpp
+++ b/src/meta/test/meta_partition_guardian_test.cpp
@@ -54,7 +54,6 @@
#include "meta_service_test_app.h"
#include "meta_test_base.h"
#include "metadata_types.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/rpc/rpc_message.h"
@@ -79,24 +78,21 @@ static void apply_update_request(/*in-out*/
configuration_update_request &update
switch (update_req.type) {
case config_type::CT_ASSIGN_PRIMARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
- pc.primary = update_req.node;
- pc.__set_hp_primary(update_req.hp_node);
+ SET_IP_AND_HOST_PORT(pc, primary, update_req.node, update_req.hp_node);
replica_helper::remove_node(update_req.node, pc.secondaries);
replica_helper::remove_node(update_req.hp_node, pc.hp_secondaries);
break;
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
- pc.secondaries.push_back(update_req.node);
- pc.hp_secondaries.push_back(update_req.hp_node);
+ ADD_IP_AND_HOST_PORT(pc, secondaries, update_req.node,
update_req.hp_node);
update_req.type = config_type::CT_UPGRADE_TO_SECONDARY;
break;
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
if (update_req.hp_node == pc.hp_primary) {
- pc.primary.set_invalid();
- pc.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(pc, primary);
} else {
replica_helper::remove_node(update_req.node, pc.secondaries);
replica_helper::remove_node(update_req.hp_node, pc.hp_secondaries);
@@ -104,10 +100,8 @@ static void apply_update_request(/*in-out*/
configuration_update_request &update
break;
case config_type::CT_DOWNGRADE_TO_SECONDARY:
- pc.secondaries.push_back(pc.primary);
- pc.hp_secondaries.push_back(pc.hp_primary);
- pc.primary.set_invalid();
- pc.hp_primary.reset();
+ ADD_IP_AND_HOST_PORT(pc, secondaries, pc.primary, pc.hp_primary);
+ RESET_IP_AND_HOST_PORT(pc, primary);
break;
default:
break;
@@ -220,11 +214,8 @@ void meta_partition_guardian_test::cure_test()
std::cerr << "Case: upgrade secondary to primary, and message lost" <<
std::endl;
// initialize
state->_nodes.clear();
- pc.primary.set_invalid();
- pc.hp_primary.reset();
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[0]),
- dsn::dns_resolver::instance().resolve_address(nodes[1])};
- pc.__set_hp_secondaries({nodes[0], nodes[1]});
+ RESET_IP_AND_HOST_PORT(pc, primary);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[0], nodes[1]);
pc.ballot = 1;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -283,11 +274,8 @@ void meta_partition_guardian_test::cure_test()
std::cerr << "Case: upgrade secondary to primary, and the candidate died"
<< std::endl;
// initialize
state->_nodes.clear();
- pc.primary.set_invalid();
- pc.hp_primary.reset();
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[0]),
- dsn::dns_resolver::instance().resolve_address(nodes[1])};
- pc.__set_hp_secondaries({nodes[0], nodes[1]});
+ RESET_IP_AND_HOST_PORT(pc, primary);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[0], nodes[1]);
pc.ballot = 1;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -347,10 +335,8 @@ void meta_partition_guardian_test::cure_test()
std::cerr << "Case: add secondary, and the message lost" << std::endl;
// initialize
state->_nodes.clear();
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1])};
- pc.__set_hp_primary(nodes[0]);
- pc.__set_hp_secondaries({nodes[1]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[1]);
pc.ballot = 1;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -408,10 +394,8 @@ void meta_partition_guardian_test::cure_test()
std::cerr << "Case: add secondary, but the primary is removing another" <<
std::endl;
// initialize
state->_nodes.clear();
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1])};
- pc.__set_hp_primary(nodes[0]);
- pc.__set_hp_secondaries({nodes[1]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[1]);
pc.ballot = 1;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -432,8 +416,7 @@ void meta_partition_guardian_test::cure_test()
update_req->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
update_req->node = update_req->config.secondaries[0];
update_req->hp_node = update_req->config.hp_secondaries[0];
- update_req->config.secondaries.clear();
- update_req->config.hp_secondaries.clear();
+ CLEAR_IP_AND_HOST_PORT(update_req->config, secondaries);
proposal_sent = true;
@@ -453,10 +436,8 @@ void meta_partition_guardian_test::cure_test()
std::cerr << "Case: add secondary, and the added secondary is dead" <<
std::endl;
// initialize
state->_nodes.clear();
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1])};
- pc.__set_hp_primary(nodes[0]);
- pc.__set_hp_secondaries({nodes[1]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[1]);
pc.ballot = 1;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -517,10 +498,8 @@ void meta_partition_guardian_test::cure_test()
std::cerr << "Case: add secondary, and the primary is dead" << std::endl;
// initialize
state->_nodes.clear();
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.__set_hp_primary(nodes[0]);
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1])};
- pc.__set_hp_secondaries({nodes[1]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[1]);
pc.ballot = 1;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -553,13 +532,9 @@ void meta_partition_guardian_test::cure_test()
std::this_thread::sleep_for(std::chrono::milliseconds(500));
state->_nodes.clear();
- pc.primary.set_invalid();
- pc.hp_primary.reset();
- pc.hp_secondaries.clear();
- pc.last_drops = {dsn::dns_resolver::instance().resolve_address(nodes[0]),
- dsn::dns_resolver::instance().resolve_address(nodes[1]),
- dsn::dns_resolver::instance().resolve_address(nodes[2])};
- pc.__set_hp_last_drops({nodes[0], nodes[1], nodes[2]});
+ RESET_IP_AND_HOST_PORT(pc, primary);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, last_drops, nodes[0], nodes[1],
nodes[2]);
pc.ballot = 4;
state->initialize_node_state();
svc->set_node_state(nodes, true);
@@ -669,13 +644,9 @@ void meta_partition_guardian_test::cure_test()
get_node_state(state->_nodes, nodes[1],
false)->set_replicas_collect_flag(true);
get_node_state(state->_nodes, nodes[2],
false)->set_replicas_collect_flag(true);
- pc.primary.set_invalid();
- pc.hp_primary.reset();
- pc.hp_secondaries.clear();
- pc.last_drops = {dsn::dns_resolver::instance().resolve_address(nodes[0]),
- dsn::dns_resolver::instance().resolve_address(nodes[1]),
- dsn::dns_resolver::instance().resolve_address(nodes[2])};
- pc.__set_hp_last_drops({nodes[0], nodes[1], nodes[2]});
+ RESET_IP_AND_HOST_PORT(pc, primary);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, last_drops, nodes[0], nodes[1],
nodes[2]);
t = dsn::tasking::enqueue(LPC_META_STATE_NORMAL,
nullptr,
@@ -769,11 +740,9 @@ void meta_partition_guardian_test::cure_test()
return update_req;
});
- pc.primary.set_invalid();
- pc.hp_primary.reset();
- pc.hp_secondaries.clear();
- pc.last_drops = {dsn::dns_resolver::instance().resolve_address(nodes[0])};
- pc.__set_hp_last_drops({nodes[0]});
+ RESET_IP_AND_HOST_PORT(pc, primary);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, last_drops, nodes[0]);
state->_nodes.clear();
pc.ballot = 1;
state->initialize_node_state();
@@ -926,43 +895,37 @@ void meta_partition_guardian_test::from_proposal_test()
std::cerr << "Case 6: test invalid proposal: already have priamry but
assign" << std::endl;
cpa2 = new_proposal_action(nodes_list[0], nodes_list[0],
config_type::CT_ASSIGN_PRIMARY);
cc.lb_actions.assign_balancer_proposals({cpa2});
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes_list[1]);
- pc.__set_hp_primary(nodes_list[1]);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes_list[1]);
ASSERT_FALSE(guardian.from_proposals(mv, p, cpa));
ASSERT_EQ(config_type::CT_INVALID, cpa.type);
std::cerr << "Case 7: test invalid proposal: upgrade non-secondary" <<
std::endl;
cpa2 = new_proposal_action(nodes_list[0], nodes_list[0],
config_type::CT_UPGRADE_TO_PRIMARY);
cc.lb_actions.assign_balancer_proposals({cpa2});
- pc.primary.set_invalid();
- pc.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(pc, primary);
ASSERT_FALSE(guardian.from_proposals(mv, p, cpa));
ASSERT_EQ(config_type::CT_INVALID, cpa.type);
std::cerr << "Case 8: test invalid proposal: add exist secondary" <<
std::endl;
cpa2 = new_proposal_action(nodes_list[0], nodes_list[1],
config_type::CT_ADD_SECONDARY);
cc.lb_actions.assign_balancer_proposals({cpa2});
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes_list[1]);
- pc.__set_hp_primary(nodes_list[1]);
- pc.secondaries =
{dsn::dns_resolver::instance().resolve_address(nodes_list[1])};
- pc.__set_hp_secondaries({nodes_list[1]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes_list[1]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes_list[1]);
ASSERT_FALSE(guardian.from_proposals(mv, p, cpa));
ASSERT_EQ(config_type::CT_INVALID, cpa.type);
std::cerr << "Case 9: test invalid proposal: downgrade non member" <<
std::endl;
cpa2 = new_proposal_action(nodes_list[0], nodes_list[1],
config_type::CT_REMOVE);
cc.lb_actions.assign_balancer_proposals({cpa2});
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes_list[0]);
- pc.__set_hp_primary(nodes_list[0]);
- pc.hp_secondaries.clear();
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes_list[0]);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
ASSERT_FALSE(guardian.from_proposals(mv, p, cpa));
ASSERT_EQ(config_type::CT_INVALID, cpa.type);
std::cerr << "Case 10: test abnormal learning detect" << std::endl;
cpa2 = new_proposal_action(nodes_list[0], nodes_list[1],
config_type::CT_ADD_SECONDARY);
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes_list[0]);
- pc.__set_hp_primary(nodes_list[0]);
- pc.hp_secondaries.clear();
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes_list[0]);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
cc.lb_actions.assign_balancer_proposals({cpa2});
replica_info i;
diff --git a/src/meta/test/misc/misc.cpp b/src/meta/test/misc/misc.cpp
index c191b8be9..f3dcdaf36 100644
--- a/src/meta/test/misc/misc.cpp
+++ b/src/meta/test/misc/misc.cpp
@@ -47,7 +47,7 @@
#include "duplication_types.h"
#include "meta_admin_types.h"
#include "metadata_types.h"
-#include "runtime/rpc/dns_resolver.h"
+#include "runtime/rpc/dns_resolver.h" // IWYU pragma: keep
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "utils/fmt_logging.h"
@@ -125,14 +125,10 @@ void generate_app(/*out*/ std::shared_ptr<app_state> &app,
indices[2] = random32(indices[1] + 1, node_list.size() - 1);
int p = random32(0, 2);
- pc.__set_hp_primary(node_list[indices[p]]);
- pc.__set_hp_secondaries({});
- pc.primary =
dsn::dns_resolver::instance().resolve_address(node_list[indices[p]]);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, node_list[indices[p]]);
for (unsigned int i = 0; i != indices.size(); ++i) {
if (i != p) {
- pc.secondaries.push_back(
-
dsn::dns_resolver::instance().resolve_address(node_list[indices[i]]));
- pc.hp_secondaries.push_back(node_list[indices[i]]);
+ ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries,
node_list[indices[i]]);
}
}
@@ -314,8 +310,7 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
CHECK(pc.hp_secondaries.empty(), "");
CHECK(pc.secondaries.empty(), "");
- pc.primary = act.node;
- pc.__set_hp_primary(hp_node);
+ SET_IP_AND_HOST_PORT(pc, primary, act.node, hp_node);
ns = &nodes[hp_node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, true);
@@ -326,8 +321,7 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
CHECK_EQ(act.target, pc.primary);
CHECK(!is_member(pc, hp_node), "");
- pc.hp_secondaries.push_back(hp_node);
- pc.secondaries.push_back(act.node);
+ ADD_IP_AND_HOST_PORT(pc, secondaries, act.node, hp_node);
ns = &nodes[hp_node];
CHECK_EQ(ns->served_as(pc.pid), partition_status::PS_INACTIVE);
ns->put_partition(pc.pid, false);
@@ -342,10 +336,8 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
CHECK(nodes.find(hp_node) != nodes.end(), "");
CHECK(!is_secondary(pc, pc.hp_primary), "");
nodes[hp_node].remove_partition(pc.pid, true);
- pc.secondaries.push_back(pc.primary);
- pc.hp_secondaries.push_back(pc.hp_primary);
- pc.primary.set_invalid();
- pc.__set_hp_primary(dsn::host_port());
+ ADD_IP_AND_HOST_PORT(pc, secondaries, pc.primary, pc.hp_primary);
+ RESET_IP_AND_HOST_PORT(pc, primary);
break;
case config_type::CT_UPGRADE_TO_PRIMARY:
@@ -357,8 +349,7 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
CHECK(nodes.find(hp_node) != nodes.end(), "");
ns = &nodes[hp_node];
- pc.hp_primary = hp_node;
- pc.primary = act.node;
+ SET_IP_AND_HOST_PORT(pc, primary, act.node, hp_node);
CHECK(replica_helper::remove_node(hp_node, pc.hp_secondaries), "");
CHECK(replica_helper::remove_node(act.node, pc.secondaries), "");
ns->put_partition(pc.pid, true);
@@ -370,11 +361,7 @@ void proposal_action_check_and_apply(const
configuration_proposal_action &act,
CHECK(!is_member(pc, hp_node), "");
CHECK(act.hp_node, "");
CHECK(act.node, "");
- if (!pc.__isset.hp_secondaries) {
- pc.__set_hp_secondaries({});
- }
- pc.hp_secondaries.push_back(hp_node);
- pc.secondaries.push_back(act.node);
+ ADD_IP_AND_HOST_PORT(pc, secondaries, act.node, hp_node);
ns = &nodes[hp_node];
ns->put_partition(pc.pid, false);
diff --git a/src/meta/test/state_sync_test.cpp
b/src/meta/test/state_sync_test.cpp
index a09b2b90b..7c08181a8 100644
--- a/src/meta/test/state_sync_test.cpp
+++ b/src/meta/test/state_sync_test.cpp
@@ -46,7 +46,6 @@
#include "meta/test/misc/misc.h"
#include "meta_admin_types.h"
#include "meta_service_test_app.h"
-#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task.h"
@@ -55,6 +54,7 @@
#include "utils/flags.h"
#include "utils/strings.h"
#include "utils/utils.h"
+#include "utils/test_macros.h"
DSN_DECLARE_string(cluster_root);
DSN_DECLARE_string(meta_state_service_type);
@@ -83,22 +83,14 @@ static void
random_assign_partition_config(std::shared_ptr<app_state> &app,
start = indices.back() + 1;
}
const auto &primary = get_server(indices[0]);
- pc.primary = dsn::dns_resolver::instance().resolve_address(primary);
- pc.__set_hp_primary(primary);
- if (!pc.__isset.hp_secondaries) {
- pc.__set_hp_secondaries({});
- }
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, primary);
for (int i = 1; i < indices.size(); ++i) {
const auto &secondary = get_server(indices[i]);
if (secondary) {
-
pc.secondaries.push_back(dsn::dns_resolver::instance().resolve_address(secondary));
- pc.hp_secondaries.push_back(secondary);
+ ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries, secondary);
}
}
- const auto hp = server_list.back();
- const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
- pc.__set_hp_last_drops({hp});
- pc.last_drops = {addr};
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, last_drops, server_list.back());
}
}
@@ -254,7 +246,7 @@ void meta_service_test_app::state_sync_test()
dsn::error_code ec = ss2->initialize_data_structure();
ASSERT_EQ(ec, dsn::ERR_OK);
- app_mapper_compare(ss1->_all_apps, ss2->_all_apps);
+ NO_FATALS(app_mapper_compare(ss1->_all_apps, ss2->_all_apps));
ASSERT_EQ(ss1->_exist_apps.size(), ss2->_exist_apps.size());
for (const auto &iter : ss1->_exist_apps) {
ASSERT_TRUE(ss2->_exist_apps.find(iter.first) !=
ss2->_exist_apps.end());
@@ -275,7 +267,7 @@ void meta_service_test_app::state_sync_test()
dsn::error_code ec =
ss2->restore_from_local_storage("meta_state.dump3");
ASSERT_EQ(ec, dsn::ERR_OK);
- app_mapper_compare(ss1->_all_apps, ss2->_all_apps);
+ NO_FATALS(app_mapper_compare(ss1->_all_apps, ss2->_all_apps));
ASSERT_TRUE(ss1->_exist_apps.size() == ss2->_exist_apps.size());
for (const auto &iter : ss1->_exist_apps) {
ASSERT_TRUE(ss2->_exist_apps.find(iter.first) !=
ss2->_exist_apps.end());
diff --git a/src/meta/test/update_configuration_test.cpp
b/src/meta/test/update_configuration_test.cpp
index 4f47b65c6..c824fe561 100644
--- a/src/meta/test/update_configuration_test.cpp
+++ b/src/meta/test/update_configuration_test.cpp
@@ -107,24 +107,21 @@ public:
switch (update_req->type) {
case config_type::CT_ASSIGN_PRIMARY:
case config_type::CT_UPGRADE_TO_PRIMARY:
- pc.primary = update_req->node;
- pc.__set_hp_primary(update_req->hp_node);
+ SET_IP_AND_HOST_PORT(pc, primary, update_req->node,
update_req->hp_node);
replica_helper::remove_node(update_req->node, pc.secondaries);
replica_helper::remove_node(update_req->hp_node,
pc.hp_secondaries);
break;
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
- pc.secondaries.push_back(update_req->node);
- pc.hp_secondaries.push_back(update_req->hp_node);
+ ADD_IP_AND_HOST_PORT(pc, secondaries, update_req->node,
update_req->hp_node);
update_req->type = config_type::CT_UPGRADE_TO_SECONDARY;
break;
case config_type::CT_REMOVE:
case config_type::CT_DOWNGRADE_TO_INACTIVE:
if (update_req->hp_node == pc.hp_primary) {
- pc.primary.set_invalid();
- pc.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(pc, primary);
} else {
replica_helper::remove_node(update_req->node, pc.secondaries);
replica_helper::remove_node(update_req->hp_node,
pc.hp_secondaries);
@@ -132,10 +129,8 @@ public:
break;
case config_type::CT_DOWNGRADE_TO_SECONDARY:
- pc.secondaries.push_back(pc.primary);
- pc.primary.set_invalid();
- pc.hp_secondaries.push_back(pc.hp_primary);
- pc.hp_primary.reset();
+ ADD_IP_AND_HOST_PORT(pc, secondaries, pc.primary, pc.hp_primary);
+ RESET_IP_AND_HOST_PORT(pc, primary);
break;
default:
break;
@@ -253,19 +248,13 @@ void meta_service_test_app::update_configuration_test()
generate_node_list(nodes, 4, 4);
dsn::partition_configuration &pc0 = app->partitions[0];
- pc0.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc0.__set_hp_primary(nodes[0]);
- pc0.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1]),
-
dsn::dns_resolver::instance().resolve_address(nodes[2])};
- pc0.__set_hp_secondaries({nodes[1], nodes[2]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc0, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc0, secondaries, nodes[1], nodes[2]);
pc0.ballot = 3;
dsn::partition_configuration &pc1 = app->partitions[1];
- pc1.primary = dsn::dns_resolver::instance().resolve_address(nodes[1]);
- pc1.__set_hp_primary(nodes[1]);
- pc1.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[0]),
-
dsn::dns_resolver::instance().resolve_address(nodes[2])};
- pc1.__set_hp_secondaries({nodes[0], nodes[2]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc1, primary, nodes[1]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc1, secondaries, nodes[0], nodes[2]);
pc1.ballot = 3;
ss->sync_apps_to_remote_storage();
@@ -337,11 +326,8 @@ void meta_service_test_app::adjust_dropped_size()
// first, the replica is healthy, and there are 2 dropped
dsn::partition_configuration &pc = app->partitions[0];
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.__set_hp_primary(nodes[0]);
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1]),
- dsn::dns_resolver::instance().resolve_address(nodes[2])};
- pc.__set_hp_secondaries({nodes[1], nodes[2]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[1], nodes[2]);
pc.ballot = 10;
config_context &cc = *get_config_context(ss->_all_apps, pc.pid);
@@ -359,8 +345,7 @@ void meta_service_test_app::adjust_dropped_size()
std::make_shared<configuration_update_request>();
req->config = pc;
req->config.ballot++;
-
req->config.secondaries.push_back(dsn::dns_resolver::instance().resolve_address(nodes[5]));
- req->config.__set_hp_secondaries({nodes[5]});
+ SET_IPS_AND_HOST_PORTS_BY_DNS(req->config, secondaries, nodes[5]);
req->info = info;
req->node = dsn::dns_resolver::instance().resolve_address(nodes[5]);
req->__set_hp_node(nodes[5]);
@@ -520,11 +505,8 @@ void meta_service_test_app::cannot_run_balancer_test()
svc->_state->_table_metric_entities.create_entity(info.app_id,
info.partition_count);
dsn::partition_configuration &pc = the_app->partitions[0];
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.__set_hp_primary(nodes[0]);
- pc.secondaries = {dsn::dns_resolver::instance().resolve_address(nodes[1]),
- dsn::dns_resolver::instance().resolve_address(nodes[2])};
- pc.__set_hp_secondaries({nodes[1], nodes[2]});
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, nodes[1], nodes[2]);
#define REGENERATE_NODE_MAPPER
\
svc->_state->_nodes.clear();
\
@@ -541,15 +523,13 @@ void meta_service_test_app::cannot_run_balancer_test()
// all the partitions are not healthy
svc->_function_level.store(meta_function_level::fl_lively);
- pc.primary.set_invalid();
- pc.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(pc, primary);
REGENERATE_NODE_MAPPER;
ASSERT_FALSE(svc->_state->check_all_partitions());
// some dropped node still exists in nodes
- pc.primary = dsn::dns_resolver::instance().resolve_address(nodes[0]);
- pc.__set_hp_primary(nodes[0]);
+ SET_IP_AND_HOST_PORT_BY_DNS(pc, primary, nodes[0]);
REGENERATE_NODE_MAPPER;
get_node_state(svc->_state->_nodes, pc.hp_primary, true)->set_alive(false);
ASSERT_FALSE(svc->_state->check_all_partitions());
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 6f747bac1..a96b8337f 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -246,10 +246,9 @@ void replica_bulk_loader::on_group_bulk_load(const
group_bulk_load_request &requ
return;
}
- LOG_INFO_PREFIX("receive group_bulk_load request, primary address =
{}({}), ballot = {}, "
+ LOG_INFO_PREFIX("receive group_bulk_load request, primary address = {},
ballot = {}, "
"meta bulk_load_status = {}, local bulk_load_status = {}",
- request.config.hp_primary,
- request.config.primary,
+ FMT_HOST_PORT_AND_IP(request.config, primary),
request.config.ballot,
enum_to_string(request.meta_bulk_load_status),
enum_to_string(_status));
@@ -933,9 +932,8 @@ void
replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
_replica->_primary_states.membership.primary,
_replica->_primary_states.membership.hp_primary,
primary_state);
- LOG_INFO_PREFIX("primary = {}({}), download progress = {}%, status = {}",
- _replica->_primary_states.membership.hp_primary,
- _replica->_primary_states.membership.primary,
+ LOG_INFO_PREFIX("primary = {}, download progress = {}%, status = {}",
+ FMT_HOST_PORT_AND_IP(_replica->_primary_states.membership,
primary),
primary_state.download_progress,
primary_state.download_status);
@@ -976,9 +974,8 @@ void
replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
_replica->_primary_states.membership.primary,
_replica->_primary_states.membership.hp_primary,
primary_state);
- LOG_INFO_PREFIX("primary = {}({}), ingestion status = {}",
- _replica->_primary_states.membership.hp_primary,
- _replica->_primary_states.membership.primary,
+ LOG_INFO_PREFIX("primary = {}, ingestion status = {}",
+ FMT_HOST_PORT_AND_IP(_replica->_primary_states.membership,
primary),
enum_to_string(primary_state.ingest_status));
bool is_group_ingestion_finish =
@@ -1025,7 +1022,7 @@ void
replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
_replica->_primary_states.membership.hp_primary,
primary_state);
LOG_INFO_PREFIX("primary = {}, bulk load states cleaned_up = {}",
- _replica->_primary_states.membership.primary,
+ FMT_HOST_PORT_AND_IP(_replica->_primary_states.membership,
primary),
primary_state.is_cleaned_up);
bool group_flag = (primary_state.is_cleaned_up) &&
@@ -1063,9 +1060,8 @@ void
replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
_replica->_primary_states.membership.primary,
_replica->_primary_states.membership.hp_primary,
primary_state);
- LOG_INFO_PREFIX("primary = {}({}), bulk_load is_paused = {}",
- _replica->_primary_states.membership.hp_primary,
- _replica->_primary_states.membership.primary,
+ LOG_INFO_PREFIX("primary = {}, bulk_load is_paused = {}",
+ FMT_HOST_PORT_AND_IP(_replica->_primary_states.membership,
primary),
primary_state.is_paused);
bool group_is_paused = primary_state.is_paused &&
diff --git a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
index 95dbad2cb..9fb744bd6 100644
--- a/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
+++ b/src/replica/bulk_load/test/replica_bulk_loader_test.cpp
@@ -241,13 +241,8 @@ public:
config.max_replica_count = 3;
config.pid = PID;
config.ballot = BALLOT;
- config.primary = PRIMARY;
- config.secondaries.emplace_back(SECONDARY);
- config.secondaries.emplace_back(SECONDARY2);
- config.__set_hp_primary(PRIMARY_HP);
- config.__set_hp_secondaries({});
- config.hp_secondaries.emplace_back(SECONDARY_HP);
- config.hp_secondaries.emplace_back(SECONDARY_HP2);
+ SET_IP_AND_HOST_PORT_BY_DNS(config, primary, PRIMARY_HP);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(config, secondaries, SECONDARY_HP,
SECONDARY_HP2);
_replica->set_primary_partition_configuration(config);
}
diff --git a/src/replica/duplication/replica_follower.cpp
b/src/replica/duplication/replica_follower.cpp
index 2818e8060..6678017c7 100644
--- a/src/replica/duplication/replica_follower.cpp
+++ b/src/replica/duplication/replica_follower.cpp
@@ -186,10 +186,9 @@ error_code
replica_follower::update_master_replica_config(error_code err, query_
// since the request just specify one partition, the result size is single
_master_replica_config = resp.partitions[0];
LOG_INFO_PREFIX(
- "query master[{}]({}) config successfully and update local config:
remote={}, gpid={}",
+ "query master[{}] config successfully and update local config:
remote={}, gpid={}",
master_replica_name(),
- _master_replica_config.hp_primary,
- _master_replica_config.primary,
+ FMT_HOST_PORT_AND_IP(_master_replica_config, primary),
_master_replica_config.pid);
return ERR_OK;
}
diff --git a/src/replica/duplication/replica_follower.h
b/src/replica/duplication/replica_follower.h
index c57552f94..d6711c4b0 100644
--- a/src/replica/duplication/replica_follower.h
+++ b/src/replica/duplication/replica_follower.h
@@ -26,7 +26,6 @@
#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "replica/replica_base.h"
-#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task_tracker.h"
#include "utils/error_code.h"
@@ -80,10 +79,9 @@ private:
{
std::string app_info = fmt::format("{}.{}", _master_cluster_name,
_master_app_name);
if (_master_replica_config.hp_primary) {
- return fmt::format("{}({}({})|{})",
+ return fmt::format("{}({}|{})",
app_info,
- _master_replica_config.hp_primary,
- _master_replica_config.primary,
+ FMT_HOST_PORT_AND_IP(_master_replica_config,
primary),
_master_replica_config.pid);
}
return app_info;
diff --git a/src/replica/duplication/test/replica_follower_test.cpp
b/src/replica/duplication/test/replica_follower_test.cpp
index 952440767..34585a982 100644
--- a/src/replica/duplication/test/replica_follower_test.cpp
+++ b/src/replica/duplication/test/replica_follower_test.cpp
@@ -241,8 +241,7 @@ TEST_P(replica_follower_test,
test_update_master_replica_config)
ASSERT_FALSE(master_replica_config(follower).hp_primary);
resp.partitions.clear();
- p.primary = {};
- p.__set_hp_primary(host_port());
+ RESET_IP_AND_HOST_PORT(p, primary);
p.pid = gpid(2, 1);
resp.partitions.emplace_back(p);
ASSERT_EQ(update_master_replica_config(follower, resp), ERR_INVALID_STATE);
@@ -256,13 +255,8 @@ TEST_P(replica_follower_test,
test_update_master_replica_config)
const host_port secondary1("localhost", 34802);
const host_port secondary2("localhost", 34803);
- p.primary = dsn::dns_resolver::instance().resolve_address(primary);
-
p.secondaries.emplace_back(dsn::dns_resolver::instance().resolve_address(secondary1));
-
p.secondaries.emplace_back(dsn::dns_resolver::instance().resolve_address(secondary2));
- p.__set_hp_primary(primary);
- p.__set_hp_secondaries({});
- p.hp_secondaries.emplace_back(secondary1);
- p.hp_secondaries.emplace_back(secondary2);
+ SET_IP_AND_HOST_PORT_BY_DNS(p, primary, primary);
+ SET_IPS_AND_HOST_PORTS_BY_DNS(p, secondaries, secondary1, secondary2);
resp.partitions.emplace_back(p);
ASSERT_EQ(update_master_replica_config(follower, resp), ERR_OK);
ASSERT_EQ(master_replica_config(follower).primary, p.primary);
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index cd8ff6c3a..c35a8ad7b 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -164,8 +164,8 @@ void replica::assign_primary(configuration_update_request
&proposal)
return;
}
- proposal.config.primary = _stub->primary_address();
- proposal.config.__set_hp_primary(_stub->primary_host_port());
+ SET_IP_AND_HOST_PORT(
+ proposal.config, primary, _stub->primary_address(),
_stub->primary_host_port());
replica_helper::remove_node(_stub->primary_address(),
proposal.config.secondaries);
replica_helper::remove_node(_stub->primary_host_port(),
proposal.config.hp_secondaries);
@@ -252,11 +252,7 @@ void replica::upgrade_to_secondary_on_primary(const
::dsn::host_port &node)
partition_configuration new_config = _primary_states.membership;
// add secondary
- if (!new_config.__isset.hp_secondaries) {
- new_config.__set_hp_secondaries({});
- }
- new_config.hp_secondaries.push_back(node);
-
new_config.secondaries.push_back(dsn::dns_resolver::instance().resolve_address(node));
+ ADD_IP_AND_HOST_PORT_BY_DNS(new_config, secondaries, node);
update_configuration_on_meta_server(config_type::CT_UPGRADE_TO_SECONDARY,
node, new_config);
}
@@ -271,14 +267,8 @@ void
replica::downgrade_to_secondary_on_primary(configuration_update_request &pr
CHECK(proposal.config.hp_secondaries ==
_primary_states.membership.hp_secondaries, "");
CHECK_EQ(proposal.hp_node, proposal.config.hp_primary);
- proposal.config.primary.set_invalid();
- proposal.config.__set_hp_primary(host_port());
- proposal.config.secondaries.push_back(proposal.node);
- if (!proposal.config.__isset.hp_secondaries) {
- proposal.config.__set_hp_secondaries({});
- }
- proposal.config.hp_secondaries.push_back(proposal.hp_node);
-
+ RESET_IP_AND_HOST_PORT(proposal.config, primary);
+ ADD_IP_AND_HOST_PORT(proposal.config, secondaries, proposal.node,
proposal.hp_node);
update_configuration_on_meta_server(
config_type::CT_DOWNGRADE_TO_SECONDARY, proposal.hp_node,
proposal.config);
}
@@ -293,8 +283,7 @@ void
replica::downgrade_to_inactive_on_primary(configuration_update_request &pro
CHECK(proposal.config.hp_secondaries ==
_primary_states.membership.hp_secondaries, "");
if (proposal.hp_node == proposal.config.hp_primary) {
- proposal.config.primary.set_invalid();
- proposal.config.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(proposal.config, primary);
} else {
CHECK(replica_helper::remove_node(proposal.node,
proposal.config.secondaries) &&
replica_helper::remove_node(proposal.hp_node,
proposal.config.hp_secondaries),
@@ -320,8 +309,7 @@ void replica::remove(configuration_update_request &proposal)
switch (st) {
case partition_status::PS_PRIMARY:
CHECK_EQ(proposal.config.hp_primary, proposal.hp_node);
- proposal.config.primary.set_invalid();
- proposal.config.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(proposal.config, primary);
break;
case partition_status::PS_SECONDARY: {
CHECK(replica_helper::remove_node(proposal.node,
proposal.config.secondaries) &&
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 9b539d648..136556efc 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1471,8 +1471,7 @@ void replica_stub::remove_replica_on_meta_server(const
app_info &info,
request->type = config_type::CT_DOWNGRADE_TO_INACTIVE;
if (_primary_host_port == config.hp_primary) {
- request->config.primary.set_invalid();
- request->config.hp_primary.reset();
+ RESET_IP_AND_HOST_PORT(request->config, primary);
} else if (replica_helper::remove_node(primary_address(),
request->config.secondaries) &&
replica_helper::remove_node(_primary_host_port,
request->config.hp_secondaries)) {
} else {
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index 26d2aa01b..58dee52dc 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -991,8 +991,7 @@ void replica_split_manager::register_child_on_meta(ballot
b) // on primary paren
partition_configuration child_config =
_replica->_primary_states.membership;
child_config.ballot++;
child_config.last_committed_decree = 0;
- child_config.last_drops.clear();
- child_config.hp_last_drops.clear();
+ CLEAR_IP_AND_HOST_PORT(child_config, last_drops);
child_config.pid.set_partition_index(_replica->_app_info.partition_count +
get_gpid().get_partition_index());
diff --git a/src/replica/split/test/replica_split_test.cpp
b/src/replica/split/test/replica_split_test.cpp
index ce45f5448..6694c370b 100644
--- a/src/replica/split/test/replica_split_test.cpp
+++ b/src/replica/split/test/replica_split_test.cpp
@@ -191,13 +191,10 @@ public:
config.max_replica_count = 3;
config.pid = PARENT_GPID;
config.ballot = INIT_BALLOT;
- config.hp_primary = PRIMARY;
- config.primary = PRIMARY_ADDR;
- config.__set_hp_secondaries({SECONDARY});
- config.secondaries.emplace_back(SECONDARY_ADDR);
+ SET_IP_AND_HOST_PORT_BY_DNS(config, primary, PRIMARY);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries, SECONDARY);
if (!lack_of_secondary) {
- config.secondaries.emplace_back(SECONDARY_ADDR2);
- config.hp_secondaries.emplace_back(SECONDARY2);
+ ADD_IP_AND_HOST_PORT_BY_DNS(config, secondaries, SECONDARY2);
}
_parent_replica->set_primary_partition_configuration(config);
}
@@ -363,8 +360,7 @@ public:
req.parent_config.pid = PARENT_GPID;
req.parent_config.ballot = INIT_BALLOT;
req.parent_config.last_committed_decree = DECREE;
- req.parent_config.primary = PRIMARY_ADDR;
- req.parent_config.__set_hp_primary(PRIMARY);
+ SET_IP_AND_HOST_PORT_BY_DNS(req.parent_config, primary, PRIMARY);
req.child_config.pid = CHILD_GPID;
req.child_config.ballot = INIT_BALLOT + 1;
req.child_config.last_committed_decree = 0;
diff --git a/src/runtime/rpc/rpc_host_port.h b/src/runtime/rpc/rpc_host_port.h
index 762e476de..1b8cc70b5 100644
--- a/src/runtime/rpc/rpc_host_port.h
+++ b/src/runtime/rpc/rpc_host_port.h
@@ -49,32 +49,147 @@ class TProtocol;
#define GET_HOST_PORT(obj, field, target)
\
do {
\
const auto &_obj = (obj);
\
+ auto &_target = (target);
\
if (_obj.__isset.hp_##field) {
\
- target = _obj.hp_##field;
\
+ _target = _obj.hp_##field;
\
} else {
\
- target = std::move(dsn::host_port::from_address(_obj.field));
\
+ _target = std::move(dsn::host_port::from_address(_obj.field));
\
}
\
} while (0)
-// Set 'addr' and 'hp' to the '<field>' and optional 'hp_<field>' of 'obj'.
The type of the
+// Set 'addr' and 'hp' to the '<field>' and optional 'hp_<field>' of 'obj'.
The types of the
// fields are rpc_address and host_port, respectively.
#define SET_IP_AND_HOST_PORT(obj, field, addr, hp)
\
do {
\
auto &_obj = (obj);
\
- _obj.field = addr;
\
+ _obj.field = (addr);
\
_obj.__set_hp_##field(hp);
\
} while (0)
+// Set 'hp' and its DNS resolved rpc_address to the optional 'hp_<field>' and
'<field>' of 'obj'.
+// The types of the fields are host_port and rpc_address, respectively.
+#define SET_IP_AND_HOST_PORT_BY_DNS(obj, field, hp)
\
+ do {
\
+ auto &_obj = (obj);
\
+ const auto &_hp = (hp);
\
+ _obj.field = dsn::dns_resolver::instance().resolve_address(_hp);
\
+ _obj.__set_hp_##field(_hp);
\
+ } while (0)
+
+// Reset the '<field>' and optional 'hp_<field>' of 'obj'. The types of the
fields are rpc_address
+// and host_port, respectively.
+#define RESET_IP_AND_HOST_PORT(obj, field)
\
+ do {
\
+ auto &_obj = (obj);
\
+ _obj.field.set_invalid();
\
+ _obj.hp_##field.reset();
\
+ } while (0)
+
+// Clear the '<field>' and optional 'hp_<field>' of 'obj'. The types of the
fields are std::vector
+// with rpc_address and host_port elements, respectively.
+#define CLEAR_IP_AND_HOST_PORT(obj, field)
\
+ do {
\
+ auto &_obj = (obj);
\
+ _obj.field.clear();
\
+ _obj.__set_hp_##field({});
\
+ } while (0)
+
+// Add 'addr' and 'hp' to the vector '<field>' and optional vector
'hp_<field>' of 'obj'. The types
+// of the fields are std::vector<rpc_address> and std::vector<host_port>,
respectively.
+#define ADD_IP_AND_HOST_PORT(obj, field, addr, hp)
\
+ do {
\
+ auto &_obj = (obj);
\
+ _obj.field.push_back(addr);
\
+ if (!_obj.__isset.hp_##field) {
\
+ _obj.__set_hp_##field({hp});
\
+ } else {
\
+ _obj.hp_##field.push_back(hp);
\
+ }
\
+ } while (0)
+
+// Add 'hp' and its DNS resolved rpc_address to the optional vector
'hp_<field>' and vector
+// '<field>' of 'obj'. The types of the fields are std::vector<rpc_address> and
+// std::vector<host_port>, respectively.
+#define ADD_IP_AND_HOST_PORT_BY_DNS(obj, field, hp)
\
+ do {
\
+ auto &_obj = (obj);
\
+ auto &_hp = (hp);
\
+
_obj.field.push_back(dsn::dns_resolver::instance().resolve_address(_hp));
\
+ if (!_obj.__isset.hp_##field) {
\
+ _obj.__set_hp_##field({_hp});
\
+ } else {
\
+ _obj.hp_##field.push_back(_hp);
\
+ }
\
+ } while (0)
+
+#define SET_IPS_AND_HOST_PORTS_BY_DNS_1(obj, field, hp1)
\
+ do {
\
+ auto &_obj = (obj);
\
+ auto &_hp1 = (hp1);
\
+ _obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1)};
\
+ _obj.__set_hp_##field({_hp1});
\
+ } while (0)
+#define SET_IPS_AND_HOST_PORTS_BY_DNS_2(obj, field, hp1, hp2)
\
+ do {
\
+ auto &_obj = (obj);
\
+ auto &_hp1 = (hp1);
\
+ auto &_hp2 = (hp2);
\
+ _obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1),
\
+ dsn::dns_resolver::instance().resolve_address(_hp2)};
\
+ _obj.__set_hp_##field({_hp1, _hp2});
\
+ } while (0)
+#define SET_IPS_AND_HOST_PORTS_BY_DNS_3(obj, field, hp1, hp2, hp3)
\
+ do {
\
+ auto &_obj = (obj);
\
+ auto &_hp1 = (hp1);
\
+ auto &_hp2 = (hp2);
\
+ auto &_hp3 = (hp3);
\
+ _obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1),
\
+ dsn::dns_resolver::instance().resolve_address(_hp2),
\
+ dsn::dns_resolver::instance().resolve_address(_hp3)};
\
+ _obj.__set_hp_##field({_hp1, _hp2, _hp3});
\
+ } while (0)
+#define SET_IPS_AND_HOST_PORTS_BY_DNS_GET_MACRO(hp1, hp2, hp3, NAME, ...) NAME
+#define SET_IPS_AND_HOST_PORTS_BY_DNS_GET_MACRO_(tuple)
\
+ SET_IPS_AND_HOST_PORTS_BY_DNS_GET_MACRO tuple
+
+// Set ... and their DNS resolved rpc_addresses to the vector '<field>' and
optional vector
+// 'hp_<field>' of 'obj'. The types of the fields are std::vector<rpc_address>
and
+// std::vector<host_port>, respectively.
+#define SET_IPS_AND_HOST_PORTS_BY_DNS(obj, field, ...)
\
+ SET_IPS_AND_HOST_PORTS_BY_DNS_GET_MACRO_((__VA_ARGS__,
\
+ SET_IPS_AND_HOST_PORTS_BY_DNS_3,
\
+ SET_IPS_AND_HOST_PORTS_BY_DNS_2,
\
+
SET_IPS_AND_HOST_PORTS_BY_DNS_1)) \
+ (obj, field, __VA_ARGS__);
+
+// Head insert 'hp' and its DNS resolved rpc_address to the optional vector
'hp_<field>' and vector
+// '<field>' of 'obj'. The types of the fields are std::vector<rpc_address> and
+// std::vector<host_port>, respectively.
+#define HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS(obj, field, hp)
\
+ do {
\
+ auto &_obj = (obj);
\
+ auto &_hp = (hp);
\
+ _obj.field.insert(_obj.field.begin(),
dsn::dns_resolver::instance().resolve_address(_hp)); \
+ if (!_obj.__isset.hp_##field) {
\
+ _obj.__set_hp_##field({_hp});
\
+ } else {
\
+ _obj.hp_##field.insert(_obj.hp_##field.begin(), _hp);
\
+ }
\
+ } while (0)
+
+// TODO(yingchun): the 'hp' can be reduced.
// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'.
The key of the
// maps are rpc_address and host_port type and indexed by 'addr' and 'hp',
respectively.
#define SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value)
\
do {
\
auto &_obj = (obj);
\
- _obj.field[addr] = value;
\
+ const auto &_value = (value);
\
+ _obj.field[addr] = _value;
\
if (!_obj.__isset.hp_##field) {
\
_obj.__set_hp_##field({});
\
}
\
- _obj.hp_##field[hp] = value;
\
+ _obj.hp_##field[hp] = _value;
\
} while (0)
// Set 'value' to the '<field>' map and optional 'hp_<field>' map of 'obj'.
The key of the
@@ -82,8 +197,9 @@ class TProtocol;
// 'addr', respectively.
#define SET_VALUE_FROM_HOST_PORT(obj, field, hp, value)
\
do {
\
- const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
\
- SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, hp, value);
\
+ const auto &_hp = (hp);
\
+ const auto addr = dsn::dns_resolver::instance().resolve_address(_hp);
\
+ SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, _hp, value);
\
} while (0)
#define FMT_HOST_PORT_AND_IP(obj, field) fmt::format("{}({})",
(obj).hp_##field, (obj).field)
diff --git a/src/runtime/test/host_port_test.cpp
b/src/runtime/test/host_port_test.cpp
index 9c397c2a1..f630a5aa5 100644
--- a/src/runtime/test/host_port_test.cpp
+++ b/src/runtime/test/host_port_test.cpp
@@ -17,12 +17,18 @@
* under the License.
*/
+#include <fmt/core.h>
+#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include "bulk_load_types.h"
+#include "common/serialization_helper/dsn.layer2_types.h"
+#include "fd_types.h"
#include "gtest/gtest.h"
+#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/group_address.h"
#include "runtime/rpc/group_host_port.h"
#include "runtime/rpc/rpc_address.h"
@@ -255,4 +261,164 @@ TEST(host_port_test, thrift_parser)
send_and_check_host_port_by_serialize(hp2, DSF_THRIFT_JSON);
}
+TEST(host_port_test, test_macros)
+{
+ static const host_port kHp1("localhost", 8081);
+ static const host_port kHp2("localhost", 8082);
+ static const host_port kHp3("localhost", 8083);
+ static const rpc_address kAddr1 =
dns_resolver::instance().resolve_address(kHp1);
+ static const rpc_address kAddr2 =
dns_resolver::instance().resolve_address(kHp2);
+ static const rpc_address kAddr3 =
dns_resolver::instance().resolve_address(kHp3);
+
+ // Test GET_HOST_PORT-1.
+ {
+ fd::beacon_msg beacon;
+ host_port hp_from_node;
+ GET_HOST_PORT(beacon, from_node, hp_from_node);
+ ASSERT_FALSE(hp_from_node);
+ }
+ // Test GET_HOST_PORT-2.
+ {
+ fd::beacon_msg beacon;
+ host_port hp_from_node;
+ beacon.from_node = kAddr1;
+ GET_HOST_PORT(beacon, from_node, hp_from_node);
+ ASSERT_TRUE(hp_from_node);
+ ASSERT_EQ(kHp1, hp_from_node);
+ ASSERT_EQ(kAddr1,
dns_resolver::instance().resolve_address(hp_from_node));
+ }
+ // Test GET_HOST_PORT-3.
+ {
+ fd::beacon_msg beacon;
+ host_port hp_from_node;
+ beacon.__set_hp_from_node(kHp1);
+ GET_HOST_PORT(beacon, from_node, hp_from_node);
+ ASSERT_TRUE(hp_from_node);
+ ASSERT_EQ(kHp1, hp_from_node);
+ ASSERT_EQ(kAddr1,
dns_resolver::instance().resolve_address(hp_from_node));
+ }
+
+ // Test SET_IP_AND_HOST_PORT.
+ {
+ fd::beacon_msg beacon;
+ SET_IP_AND_HOST_PORT(beacon, from_node, kAddr1, kHp1);
+ ASSERT_EQ(kAddr1, beacon.from_node);
+ ASSERT_EQ(kHp1, beacon.hp_from_node);
+ }
+
+ // Test SET_IP_AND_HOST_PORT_BY_DNS.
+ {
+ fd::beacon_msg beacon;
+ SET_IP_AND_HOST_PORT_BY_DNS(beacon, from_node, kHp1);
+ ASSERT_EQ(kAddr1, beacon.from_node);
+ ASSERT_EQ(kHp1, beacon.hp_from_node);
+ }
+
+ // Test RESET_IP_AND_HOST_PORT.
+ {
+ fd::beacon_msg beacon;
+ SET_IP_AND_HOST_PORT_BY_DNS(beacon, from_node, kHp1);
+ ASSERT_EQ(kAddr1, beacon.from_node);
+ ASSERT_EQ(kHp1, beacon.hp_from_node);
+ RESET_IP_AND_HOST_PORT(beacon, from_node);
+ ASSERT_FALSE(beacon.from_node);
+ ASSERT_FALSE(beacon.hp_from_node);
+ }
+
+ // Test ADD_IP_AND_HOST_PORT.
+ {
+ partition_configuration pc;
+ ADD_IP_AND_HOST_PORT(pc, secondaries, kAddr1, kHp1);
+ ASSERT_EQ(1, pc.secondaries.size());
+ ASSERT_EQ(1, pc.hp_secondaries.size());
+ ASSERT_EQ(kAddr1, pc.secondaries[0]);
+ ASSERT_EQ(kHp1, pc.hp_secondaries[0]);
+ ADD_IP_AND_HOST_PORT(pc, secondaries, kAddr2, kHp2);
+ ASSERT_EQ(2, pc.secondaries.size());
+ ASSERT_EQ(2, pc.hp_secondaries.size());
+ ASSERT_EQ(kAddr2, pc.secondaries[1]);
+ ASSERT_EQ(kHp2, pc.hp_secondaries[1]);
+ }
+
+ // Test ADD_IP_AND_HOST_PORT_BY_DNS.
+ {
+ partition_configuration pc;
+ ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries, kHp1);
+ ASSERT_EQ(1, pc.secondaries.size());
+ ASSERT_EQ(1, pc.hp_secondaries.size());
+ ASSERT_EQ(kAddr1, pc.secondaries[0]);
+ ASSERT_EQ(kHp1, pc.hp_secondaries[0]);
+ ADD_IP_AND_HOST_PORT_BY_DNS(pc, secondaries, kHp2);
+ ASSERT_EQ(2, pc.secondaries.size());
+ ASSERT_EQ(2, pc.hp_secondaries.size());
+ ASSERT_EQ(kAddr2, pc.secondaries[1]);
+ ASSERT_EQ(kHp2, pc.hp_secondaries[1]);
+ }
+
+ // Test SET_IPS_AND_HOST_PORTS_BY_DNS.
+ {
+ partition_configuration pc;
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, kHp1);
+ ASSERT_EQ(1, pc.secondaries.size());
+ ASSERT_EQ(1, pc.hp_secondaries.size());
+ ASSERT_EQ(kAddr1, pc.secondaries[0]);
+ ASSERT_EQ(kHp1, pc.hp_secondaries[0]);
+
+ SET_IPS_AND_HOST_PORTS_BY_DNS(pc, secondaries, kHp2, kHp3);
+ ASSERT_EQ(2, pc.secondaries.size());
+ ASSERT_EQ(2, pc.hp_secondaries.size());
+ ASSERT_EQ(kAddr2, pc.secondaries[0]);
+ ASSERT_EQ(kHp2, pc.hp_secondaries[0]);
+ ASSERT_EQ(kAddr3, pc.secondaries[1]);
+ ASSERT_EQ(kHp3, pc.hp_secondaries[1]);
+ }
+
+ // Test CLEAR_IP_AND_HOST_PORT.
+ {
+ partition_configuration pc;
+ ADD_IP_AND_HOST_PORT(pc, secondaries, kAddr1, kHp1);
+ CLEAR_IP_AND_HOST_PORT(pc, secondaries);
+ ASSERT_TRUE(pc.secondaries.empty());
+ ASSERT_TRUE(pc.hp_secondaries.empty());
+ }
+
+ // Test SET_VALUE_FROM_IP_AND_HOST_PORT.
+ {
+ static const int kProgress = 88;
+ replication::bulk_load_response response;
+ replication::partition_bulk_load_state primary_state;
+ primary_state.__set_download_progress(kProgress);
+ SET_VALUE_FROM_IP_AND_HOST_PORT(
+ response, group_bulk_load_state, kAddr1, kHp1, primary_state);
+ ASSERT_EQ(1, response.group_bulk_load_state.size());
+ ASSERT_EQ(1, response.hp_group_bulk_load_state.size());
+ ASSERT_EQ(kAddr1, response.group_bulk_load_state.begin()->first);
+ ASSERT_EQ(kHp1, response.hp_group_bulk_load_state.begin()->first);
+ ASSERT_EQ(kProgress,
response.group_bulk_load_state.begin()->second.download_progress);
+ ASSERT_EQ(kProgress,
response.hp_group_bulk_load_state.begin()->second.download_progress);
+ }
+
+ // Test SET_VALUE_FROM_HOST_PORT.
+ {
+ static const int kProgress = 88;
+ replication::bulk_load_response response;
+ replication::partition_bulk_load_state primary_state;
+ primary_state.__set_download_progress(kProgress);
+ SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, kHp1,
primary_state);
+ ASSERT_EQ(1, response.group_bulk_load_state.size());
+ ASSERT_EQ(1, response.hp_group_bulk_load_state.size());
+ ASSERT_EQ(kAddr1, response.group_bulk_load_state.begin()->first);
+ ASSERT_EQ(kHp1, response.hp_group_bulk_load_state.begin()->first);
+ ASSERT_EQ(kProgress,
response.group_bulk_load_state.begin()->second.download_progress);
+ ASSERT_EQ(kProgress,
response.hp_group_bulk_load_state.begin()->second.download_progress);
+ }
+
+ // Test FMT_HOST_PORT_AND_IP.
+ {
+ fd::beacon_msg beacon;
+ SET_IP_AND_HOST_PORT_BY_DNS(beacon, from_node, kHp1);
+ ASSERT_EQ(fmt::format("{}({})", kHp1, kAddr1),
FMT_HOST_PORT_AND_IP(beacon, from_node));
+ }
+}
+
} // namespace dsn
diff --git a/src/server/hotspot_partition_calculator.cpp
b/src/server/hotspot_partition_calculator.cpp
index 52e1bc503..93ab165d2 100644
--- a/src/server/hotspot_partition_calculator.cpp
+++ b/src/server/hotspot_partition_calculator.cpp
@@ -27,6 +27,7 @@
#include "common/gpid.h"
#include "common/serialization_helper/dsn.layer2_types.h"
#include "perf_counter/perf_counter.h"
+#include "runtime/rpc/rpc_host_port.h"
#include "server/hotspot_partition_stat.h"
#include "shell/command_executor.h"
#include "utils/error_code.h"
@@ -226,12 +227,12 @@ void
hotspot_partition_calculator::send_detect_hotkey_request(
auto error = _shell_context->ddl_client->detect_hotkey(
partitions[partition_index].hp_primary, req, resp);
- LOG_INFO("{} {} hotkey detection in {}.{}, server host_port: {}",
+ LOG_INFO("{} {} hotkey detection in {}.{}, server: {}",
(action == dsn::replication::detect_action::STOP) ? "Stop" :
"Start",
(hotkey_type == dsn::replication::hotkey_type::WRITE) ? "write" :
"read",
app_name,
partition_index,
- partitions[partition_index].hp_primary);
+ FMT_HOST_PORT_AND_IP(partitions[partition_index], primary));
if (error != dsn::ERR_OK) {
LOG_ERROR("Hotkey detect rpc sending failed, in {}.{}, error_hint:{}",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]