This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new bec725eec refactor: introduce `host_port::resolve()` and use it
instead of `dns_resolver::instance()::resolve_address()` (#2326)
bec725eec is described below
commit bec725eeca66bb3fc2260a3498e679f5f1c40522
Author: Dan Wang <[email protected]>
AuthorDate: Tue Dec 23 14:17:48 2025 +0800
refactor: introduce `host_port::resolve()` and use it instead of
`dns_resolver::instance()::resolve_address()` (#2326)
---
src/client/partition_resolver.cpp | 3 +-
src/client/partition_resolver_simple.cpp | 3 +-
src/client/replication_ddl_client.cpp | 3 +-
src/client/replication_ddl_client.h | 7 +--
src/client_lib/pegasus_client_impl.cpp | 3 +-
src/failure_detector/failure_detector.cpp | 9 ++-
src/failure_detector/failure_detector.h | 2 +-
src/meta/backup_engine.cpp | 3 +-
src/meta/cluster_balance_policy.cpp | 12 ++--
src/meta/duplication/meta_duplication_service.cpp | 10 +--
src/meta/meta_bulk_load_service.cpp | 3 +-
src/meta/meta_service.cpp | 2 +-
src/meta/meta_service.h | 7 +--
src/meta/server_state.cpp | 64 +++++++++----------
src/meta/test/meta_bulk_load_service_test.cpp | 9 ++-
src/meta/test/meta_partition_guardian_test.cpp | 15 +++--
src/replica/bulk_load/replica_bulk_loader.cpp | 3 +-
src/replica/duplication/replica_follower.cpp | 3 +-
src/replica/replica_2pc.cpp | 3 +-
src/replica/replica_check.cpp | 3 +-
src/replica/replica_config.cpp | 19 ++----
src/replica/replica_learn.cpp | 7 ++-
src/replica/replica_restore.cpp | 8 +--
src/replica/replica_stub.cpp | 12 ++--
src/replica/replica_stub.h | 8 +--
src/replica/split/replica_split_manager.cpp | 77 +++++++++++------------
src/replica/storage/simple_kv/test/client.cpp | 18 +-----
src/rpc/dns_resolver.cpp | 35 ++++++-----
src/rpc/dns_resolver.h | 15 ++++-
src/rpc/rpc_host_port.cpp | 4 +-
src/rpc/rpc_host_port.h | 33 +++++-----
src/rpc/test/host_port_test.cpp | 11 ++--
src/runtime/test/dns_resolver_test.cpp | 4 +-
src/runtime/test_utils.h | 2 +-
src/shell/command_helper.h | 12 ++--
35 files changed, 198 insertions(+), 234 deletions(-)
diff --git a/src/client/partition_resolver.cpp
b/src/client/partition_resolver.cpp
index 6267180fb..a2eef1272 100644
--- a/src/client/partition_resolver.cpp
+++ b/src/client/partition_resolver.cpp
@@ -31,7 +31,6 @@
#include "partition_resolver_manager.h"
#include "runtime/api_layer1.h"
#include "runtime/api_task.h"
-#include "rpc/dns_resolver.h"
#include "task/task_spec.h"
#include "utils/fmt_logging.h"
#include "utils/threadpool_code.h"
@@ -130,7 +129,7 @@ void partition_resolver::call_task(const
rpc_response_task_ptr &t)
}
hdr.gpid = result.pid;
}
- dsn_rpc_call(dns_resolver::instance().resolve_address(result.hp),
t.get());
+ dsn_rpc_call(result.hp.resolve(), t.get());
},
hdr.client.timeout_ms);
}
diff --git a/src/client/partition_resolver_simple.cpp
b/src/client/partition_resolver_simple.cpp
index db1d8e543..f398b79ed 100644
--- a/src/client/partition_resolver_simple.cpp
+++ b/src/client/partition_resolver_simple.cpp
@@ -34,7 +34,6 @@
#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "partition_resolver_simple.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_message.h"
#include "rpc/serialization.h"
#include "runtime/api_layer1.h"
@@ -262,7 +261,7 @@ task_ptr partition_resolver_simple::query_config(int
partition_index, int timeou
marshall(msg, req);
return rpc::call(
- dns_resolver::instance().resolve_address(_meta_server),
+ _meta_server.resolve(),
msg,
&_tracker,
[this, partition_index](error_code err, dsn::message_ex *req,
dsn::message_ex *resp) {
diff --git a/src/client/replication_ddl_client.cpp
b/src/client/replication_ddl_client.cpp
index 46398c963..52a561b15 100644
--- a/src/client/replication_ddl_client.cpp
+++ b/src/client/replication_ddl_client.cpp
@@ -49,6 +49,7 @@
#include "fmt/core.h"
#include "fmt/format.h"
#include "meta/meta_rpc_types.h"
+#include "rpc/dns_resolver.h"
#include "rpc/group_host_port.h"
#include "rpc/rpc_address.h"
#include "runtime/api_layer1.h"
@@ -1270,7 +1271,7 @@ void replication_ddl_client::end_meta_request(const
rpc_response_task_ptr &callb
return;
}
- rpc::call(dsn::dns_resolver::instance().resolve_address(_meta_server),
+ rpc::call(_meta_server.resolve(),
request,
&_tracker,
[this, attempt_count, callback](
diff --git a/src/client/replication_ddl_client.h
b/src/client/replication_ddl_client.h
index d2651bc2f..8ef734164 100644
--- a/src/client/replication_ddl_client.h
+++ b/src/client/replication_ddl_client.h
@@ -44,7 +44,6 @@
#include "meta_admin_types.h"
#include "partition_split_types.h"
#include "replica_admin_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_holder.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -347,7 +346,7 @@ private:
auto task =
dsn::rpc::create_rpc_response_task(msg, nullptr,
empty_rpc_handler, reply_thread_hash);
- rpc::call(dsn::dns_resolver::instance().resolve_address(_meta_server),
+ rpc::call(_meta_server.resolve(),
msg,
&_tracker,
[this, task](
@@ -480,7 +479,7 @@ private:
error_code err = ERR_UNKNOWN;
for (int retry = 0; retry < MAX_RETRY; retry++) {
task_ptr task = rpc.call(
- dsn::dns_resolver::instance().resolve_address(_meta_server),
+ _meta_server.resolve(),
&_tracker,
[&err](error_code code) { err = code; },
reply_thread_hash);
@@ -505,7 +504,7 @@ private:
dsn::task_tracker tracker;
error_code err = ERR_UNKNOWN;
for (auto &rpc : rpcs) {
-
rpc.second.call(dsn::dns_resolver::instance().resolve_address(rpc.first),
+ rpc.second.call(rpc.first.resolve(),
&tracker,
[&err, &resps, &rpcs, &rpc](error_code code)
mutable {
err = code;
diff --git a/src/client_lib/pegasus_client_impl.cpp
b/src/client_lib/pegasus_client_impl.cpp
index c1b505155..e3c393692 100644
--- a/src/client_lib/pegasus_client_impl.cpp
+++ b/src/client_lib/pegasus_client_impl.cpp
@@ -34,7 +34,6 @@
#include "pegasus_client_impl.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
-#include "rpc/dns_resolver.h"
#include "rpc/group_host_port.h"
#include "rpc/serialization.h"
#include "rrdb/rrdb.client.h"
@@ -1244,7 +1243,7 @@ void pegasus_client_impl::async_get_unordered_scanners(
query_cfg_request req;
req.app_name = _app_name;
- ::dsn::rpc::call(dns_resolver::instance().resolve_address(_meta_server),
+ ::dsn::rpc::call(_meta_server.resolve(),
RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX,
req,
nullptr,
diff --git a/src/failure_detector/failure_detector.cpp
b/src/failure_detector/failure_detector.cpp
index a9c330d7d..5544b15e4 100644
--- a/src/failure_detector/failure_detector.cpp
+++ b/src/failure_detector/failure_detector.cpp
@@ -31,18 +31,17 @@
#include <ctime>
#include <map>
#include <mutex>
+#include <string_view>
#include <type_traits>
#include <utility>
-#include <string_view>
#include "failure_detector/fd.code.definition.h"
#include "fd_types.h"
#include "fmt/core.h"
#include "fmt/format.h"
#include "nlohmann/json_fwd.hpp"
-#include "runtime/api_layer1.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
+#include "runtime/api_layer1.h"
#include "runtime/serverlet.h"
#include "task/async_calls.h"
#include "task/task_spec.h"
@@ -581,9 +580,9 @@ bool failure_detector::is_worker_connected(const
::dsn::host_port &node) const
void failure_detector::send_beacon(const host_port &target, uint64_t time)
{
- const auto &addr_target =
dsn::dns_resolver::instance().resolve_address(target);
+ const auto &addr_target = target.resolve();
beacon_msg beacon;
- beacon.time = time;
+ beacon.time = static_cast<int64_t>(time);
SET_IP_AND_HOST_PORT(beacon, from_node, dsn_primary_address(),
dsn_primary_host_port());
SET_IP_AND_HOST_PORT(beacon, to_node, addr_target, target);
beacon.__set_start_time(static_cast<int64_t>(dsn::utils::process_start_millis()));
diff --git a/src/failure_detector/failure_detector.h
b/src/failure_detector/failure_detector.h
index 775e78d7f..570c9dcca 100644
--- a/src/failure_detector/failure_detector.h
+++ b/src/failure_detector/failure_detector.h
@@ -235,7 +235,7 @@ protected:
dsn::task_tracker _tracker;
// subClass can rewrite these method.
- virtual void send_beacon(const host_port &node, uint64_t time);
+ virtual void send_beacon(const host_port &target, uint64_t time);
};
} // namespace fd
} // namespace dsn
diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp
index be5347a2d..ee6ccb1f2 100644
--- a/src/meta/backup_engine.cpp
+++ b/src/meta/backup_engine.cpp
@@ -35,7 +35,6 @@
#include "meta/meta_backup_service.h"
#include "meta/meta_data.h"
#include "meta/meta_service.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_holder.h"
#include "rpc/rpc_host_port.h"
#include "runtime/api_layer1.h"
@@ -216,7 +215,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
pid,
partition_primary);
backup_rpc rpc(std::move(req), RPC_COLD_BACKUP, 10000_ms, 0,
pid.thread_hash());
- rpc.call(dsn::dns_resolver::instance().resolve_address(partition_primary),
+ rpc.call(partition_primary.resolve(),
&_tracker,
[this, rpc, pid, partition_primary](error_code err) mutable {
on_backup_reply(err, rpc.response(), pid, partition_primary);
diff --git a/src/meta/cluster_balance_policy.cpp
b/src/meta/cluster_balance_policy.cpp
index d166d36c1..4905013b0 100644
--- a/src/meta/cluster_balance_policy.cpp
+++ b/src/meta/cluster_balance_policy.cpp
@@ -17,9 +17,9 @@
#include "cluster_balance_policy.h"
-#include <limits.h>
-#include <stdlib.h>
+#include <climits>
#include <cstdint>
+#include <cstdlib>
#include <functional>
#include <iterator>
#include <unordered_map>
@@ -347,6 +347,7 @@ bool cluster_balance_policy::get_next_move(const
cluster_migration_info &cluster
return found;
}
+// TODO(wangdan): refactor this function into utils.
template <typename S>
auto select_random(const S &s, size_t n)
{
@@ -367,8 +368,11 @@ bool cluster_balance_policy::pick_up_move(const
cluster_migration_info &cluster_
if (max_load_disk_set.empty()) {
return false;
}
- auto index = rand() % max_load_disk_set.size();
- auto max_load_disk = *select_random(max_load_disk_set, index);
+
+ // TODO(wangdan): consider using C++11 random library instead.
+ // NOLINTNEXTLINE(cert-msc30-c,cert-msc50-cpp)
+ const auto index = static_cast<size_t>(std::rand()) %
max_load_disk_set.size();
+ const auto max_load_disk = *select_random(max_load_disk_set, index);
LOG_INFO("most load disk({}) on node({}) is picked, has {} partition",
max_load_disk.node,
max_load_disk.disk_tag,
diff --git a/src/meta/duplication/meta_duplication_service.cpp
b/src/meta/duplication/meta_duplication_service.cpp
index 971a4beb4..407007c22 100644
--- a/src/meta/duplication/meta_duplication_service.cpp
+++ b/src/meta/duplication/meta_duplication_service.cpp
@@ -25,8 +25,8 @@
#include <string_view>
#include <type_traits>
-#include "common/duplication_common.h"
#include "common/common.h"
+#include "common/duplication_common.h"
#include "common/gpid.h"
#include "common/replication.codes.h"
#include "common/replication_enums.h"
@@ -39,7 +39,6 @@
#include "meta_admin_types.h"
#include "meta_duplication_service.h"
#include "metadata_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/group_host_port.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
@@ -591,10 +590,7 @@ void
meta_duplication_service::do_create_follower_app_for_duplication(
app->app_name,
duplication_status_to_string(dup->status()));
- rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
- msg,
- _meta_svc->tracker(),
- std::move(create_callback));
+ rpc::call(meta_servers.resolve(), msg, _meta_svc->tracker(),
std::move(create_callback));
}
void meta_duplication_service::on_follower_app_creating_for_duplication(
@@ -747,7 +743,7 @@ void
meta_duplication_service::check_follower_app_if_create_completed(
duplication_status_to_string(dup->status()));
rpc::call(
- dsn::dns_resolver::instance().resolve_address(meta_servers),
+ meta_servers.resolve(),
msg,
_meta_svc->tracker(),
[dup, this](error_code err, query_cfg_response &&resp) mutable {
diff --git a/src/meta/meta_bulk_load_service.cpp
b/src/meta/meta_bulk_load_service.cpp
index 40aa6d90d..a0552a335 100644
--- a/src/meta/meta_bulk_load_service.cpp
+++ b/src/meta/meta_bulk_load_service.cpp
@@ -40,7 +40,6 @@
#include "meta/meta_state_service.h"
#include "meta/server_state.h"
#include "meta_admin_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_holder.h"
#include "rpc/rpc_message.h"
@@ -1622,7 +1621,7 @@ void
bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
std::map<rpc_address, partition_bulk_load_state> pbls_by_addrs;
for (const auto &[hp, pbls] : pbls_by_hps) {
-
pbls_by_addrs[dsn::dns_resolver::instance().resolve_address(hp)] = pbls;
+ pbls_by_addrs[hp.resolve()] = pbls;
}
response.bulk_load_states[pidx] = pbls_by_addrs;
}
diff --git a/src/meta/meta_service.cpp b/src/meta/meta_service.cpp
index 09be67773..15b8b7b47 100644
--- a/src/meta/meta_service.cpp
+++ b/src/meta/meta_service.cpp
@@ -590,7 +590,7 @@ meta_leader_state
meta_service::check_leader(dsn::message_ex *req,
LOG_DEBUG("leader address: {}", leader);
if (leader) {
- dsn_rpc_forward(req,
dsn::dns_resolver::instance().resolve_address(leader));
+ dsn_rpc_forward(req, leader.resolve());
return meta_leader_state::kNotLeaderAndCanForwardRpc;
}
diff --git a/src/meta/meta_service.h b/src/meta/meta_service.h
index 2c6ca8966..2f10ddbed 100644
--- a/src/meta/meta_service.h
+++ b/src/meta/meta_service.h
@@ -46,7 +46,6 @@
#include "meta_options.h"
#include "meta_rpc_types.h"
#include "meta_server_failure_detector.h"
-#include "rpc/dns_resolver.h"
#include "rpc/network.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -177,13 +176,13 @@ public:
}
virtual void send_message(const host_port &target, dsn::message_ex
*request)
{
-
dsn_rpc_call_one_way(dsn::dns_resolver::instance().resolve_address(target),
request);
+ dsn_rpc_call_one_way(target.resolve(), request);
}
virtual void send_request(dsn::message_ex * /*req*/,
const host_port &target,
const rpc_response_task_ptr &callback)
{
- dsn_rpc_call(dsn::dns_resolver::instance().resolve_address(target),
callback);
+ dsn_rpc_call(target.resolve(), callback);
}
// these two callbacks are running in fd's thread_pool, and in fd's lock
@@ -447,7 +446,7 @@ meta_leader_state meta_service::check_leader(TRpcHolder
rpc, host_port *forward_
}
if (leader) {
- rpc.forward(dsn::dns_resolver::instance().resolve_address(leader));
+ rpc.forward(leader.resolve());
return meta_leader_state::kNotLeaderAndCanForwardRpc;
}
diff --git a/src/meta/server_state.cpp b/src/meta/server_state.cpp
index 93f56cab0..2ac973e6e 100644
--- a/src/meta/server_state.cpp
+++ b/src/meta/server_state.cpp
@@ -63,7 +63,6 @@
#include "meta_bulk_load_service.h"
#include "metadata_types.h"
#include "replica_admin_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -2188,25 +2187,26 @@ void
server_state::downgrade_secondary_to_inactive(std::shared_ptr<app_state> &a
int pidx,
const host_port &node)
{
- partition_configuration &pc = app->pcs[pidx];
- config_context &cc = app->helpers->contexts[pidx];
-
+ const partition_configuration &pc = app->pcs[pidx];
CHECK(pc.hp_primary, "this shouldn't be called if the primary is invalid");
- if (config_status::pending_remote_sync != cc.stage) {
- configuration_update_request request;
- request.info = *app;
- request.config = pc;
- request.type = config_type::CT_DOWNGRADE_TO_INACTIVE;
- SET_IP_AND_HOST_PORT_BY_DNS(request, node, node);
- host_port primary;
- GET_HOST_PORT(pc, primary, primary);
- send_proposal(primary, request);
- } else {
+
+ if (config_status::pending_remote_sync ==
app->helpers->contexts[pidx].stage) {
LOG_INFO("gpid({}.{}) is syncing with remote storage, ignore the
remove seconary({})",
app->app_id,
pidx,
node);
+ return;
}
+
+ configuration_update_request request;
+ request.info = *app;
+ request.config = pc;
+ request.type = config_type::CT_DOWNGRADE_TO_INACTIVE;
+ SET_IP_AND_HOST_PORT_BY_DNS(request, node, node);
+
+ host_port primary;
+ GET_HOST_PORT(pc, primary, primary);
+ send_proposal(primary, request);
}
void server_state::downgrade_stateless_nodes(std::shared_ptr<app_state> &app,
@@ -2216,7 +2216,7 @@ void
server_state::downgrade_stateless_nodes(std::shared_ptr<app_state> &app,
auto req = std::make_shared<configuration_update_request>();
req->info = *app;
req->type = config_type::CT_REMOVE;
- req->host_node = dsn::dns_resolver::instance().resolve_address(node);
+ req->host_node = node.resolve();
RESET_IP_AND_HOST_PORT(*req, node);
req->config = app->pcs[pidx];
@@ -2353,7 +2353,7 @@ void
server_state::on_partition_node_dead(std::shared_ptr<app_state> &app,
return;
}
- CHECK(is_secondary(pc, node), "");
+ CHECK(is_secondary(pc, node), "no primary/secondary on this node: node =
{}", node);
LOG_INFO("gpid({}): secondary({}) is down, ignored it due to no primary
for this partition "
"available",
pc.pid,
@@ -2578,34 +2578,32 @@ error_code server_state::construct_partitions(
pc.pid.get_partition_index(),
boost::lexical_cast<std::string>(pc));
if (pc.hp_last_drops.size() + 1 < pc.max_replica_count) {
- hint_message += fmt::format("WARNING: partition({}.{})
only collects {}/{} "
- "of replicas, may lost
data",
+ hint_message += fmt::format("WARNING: partition({}.{})
only collects "
+ "{}/{} of replicas, may
lost data\n",
app->app_id,
pc.pid.get_partition_index(),
pc.hp_last_drops.size() +
1,
pc.max_replica_count);
}
- succeed_count++;
+ ++succeed_count;
} else {
LOG_WARNING("construct partition({}.{}) failed",
app->app_id,
pc.pid.get_partition_index());
- std::ostringstream oss;
if (skip_lost_partitions) {
- oss << "WARNING: partition(" << app->app_id << "."
- << pc.pid.get_partition_index()
- << ") has no replica collected, force "
- "recover the lost partition to empty"
- << std::endl;
+ hint_message +=
+ fmt::format("WARNING: partition({}.{}) has no
replica collected, "
+ "force recover the lost partition to
empty\n",
+ app->app_id,
+ pc.pid.get_partition_index());
} else {
- oss << "ERROR: partition(" << app->app_id << "."
- << pc.pid.get_partition_index()
- << ") has no replica collected, you can force
recover it by set "
- "skip_lost_partitions option"
- << std::endl;
+ hint_message += fmt::format("ERROR: partition({}.{})
has no replica "
+ "collected, you can force
recover it by "
+ "set skip_lost_partitions
option\n",
+ app->app_id,
+
pc.pid.get_partition_index());
}
- hint_message += oss.str();
- failed_count++;
+ ++failed_count;
}
}
}
@@ -2644,7 +2642,7 @@ server_state::sync_apps_from_replica_nodes(const
std::vector<dsn::host_port> &re
SET_IP_AND_HOST_PORT(
*app_query_req, meta_server, dsn_primary_address(),
dsn_primary_host_port());
query_app_info_rpc app_rpc(std::move(app_query_req),
RPC_QUERY_APP_INFO);
- const auto addr =
dsn::dns_resolver::instance().resolve_address(replica_nodes[i]);
+ const auto addr = replica_nodes[i].resolve();
app_rpc.call(addr,
&tracker,
[app_rpc, i, &replica_nodes, &query_app_errors,
&query_app_responses](
diff --git a/src/meta/test/meta_bulk_load_service_test.cpp
b/src/meta/test/meta_bulk_load_service_test.cpp
index ffc973010..0d863149f 100644
--- a/src/meta/test/meta_bulk_load_service_test.cpp
+++ b/src/meta/test/meta_bulk_load_service_test.cpp
@@ -52,7 +52,6 @@
#include "meta_service_test_app.h"
#include "meta_test_base.h"
#include "metadata_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "utils/blob.h"
@@ -513,10 +512,10 @@ protected:
const host_port SECONDARY2_HP = host_port("localhost", 10087);
const host_port SECONDARY3_HP = host_port("localhost", 10080);
- const rpc_address PRIMARY =
dsn::dns_resolver::instance().resolve_address(PRIMARY_HP);
- const rpc_address SECONDARY1 =
dsn::dns_resolver::instance().resolve_address(SECONDARY1_HP);
- const rpc_address SECONDARY2 =
dsn::dns_resolver::instance().resolve_address(SECONDARY2_HP);
- const rpc_address SECONDARY3 =
dsn::dns_resolver::instance().resolve_address(SECONDARY3_HP);
+ const rpc_address PRIMARY = PRIMARY_HP.resolve();
+ const rpc_address SECONDARY1 = SECONDARY1_HP.resolve();
+ const rpc_address SECONDARY2 = SECONDARY2_HP.resolve();
+ const rpc_address SECONDARY3 = SECONDARY3_HP.resolve();
};
/// start bulk load unit tests
diff --git a/src/meta/test/meta_partition_guardian_test.cpp
b/src/meta/test/meta_partition_guardian_test.cpp
index cddf43e9e..11dc39943 100644
--- a/src/meta/test/meta_partition_guardian_test.cpp
+++ b/src/meta/test/meta_partition_guardian_test.cpp
@@ -55,7 +55,6 @@
#include "meta_service_test_app.h"
#include "meta_test_base.h"
#include "metadata_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -235,7 +234,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_TRUE(is_secondary(pc, update_req->hp_node));
EXPECT_TRUE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, update_req->hp_node);
- EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
+ EXPECT_EQ(target.resolve(), update_req->node);
last_addr = update_req->hp_node;
proposal_sent = true;
@@ -258,9 +257,9 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(config_type::CT_UPGRADE_TO_PRIMARY, update_req->type);
EXPECT_EQ(update_req->hp_node, last_addr);
- EXPECT_EQ(update_req->node,
dsn::dns_resolver::instance().resolve_address(last_addr));
+ EXPECT_EQ(update_req->node, last_addr.resolve());
EXPECT_EQ(target, update_req->hp_node);
- EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
+ EXPECT_EQ(target.resolve(), update_req->node);
proposal_sent = true;
apply_update_request(*update_req);
@@ -299,7 +298,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_TRUE(is_secondary(pc, update_req->hp_node));
EXPECT_TRUE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, update_req->hp_node);
- EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
+ EXPECT_EQ(target.resolve(), update_req->node);
proposal_sent = true;
last_addr = update_req->hp_node;
@@ -325,7 +324,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_TRUE(is_secondary(pc, update_req->hp_node));
EXPECT_TRUE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, update_req->hp_node);
- EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
update_req->node);
+ EXPECT_EQ(target.resolve(), update_req->node);
EXPECT_NE(target, last_addr);
proposal_sent = true;
@@ -386,7 +385,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_EQ(update_req->type, config_type::CT_ADD_SECONDARY);
EXPECT_EQ(update_req->hp_node, last_addr);
- EXPECT_EQ(update_req->node,
dsn::dns_resolver::instance().resolve_address(last_addr));
+ EXPECT_EQ(update_req->node, last_addr.resolve());
EXPECT_EQ(target, nodes[0]);
proposal_sent = true;
@@ -531,7 +530,7 @@ void meta_partition_guardian_test::cure_test()
EXPECT_FALSE(is_secondary(pc, update_req->hp_node));
EXPECT_FALSE(is_secondary(pc, update_req->node));
EXPECT_EQ(target, pc.hp_primary);
- EXPECT_EQ(dsn::dns_resolver::instance().resolve_address(target),
pc.primary);
+ EXPECT_EQ(target.resolve(), pc.primary);
proposal_sent = true;
svc->set_node_state({pc.hp_primary}, false);
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 3ed17b74a..8c5435106 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -37,7 +37,6 @@
#include "replica/replica_stub.h"
#include "replica/replication_app_base.h"
#include "replica_bulk_loader.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_holder.h"
#include "rpc/rpc_host_port.h"
@@ -196,7 +195,7 @@ void replica_bulk_loader::broadcast_group_bulk_load(const
bulk_load_request &met
auto request = std::make_unique<group_bulk_load_request>();
request->app_name = _replica->_app_info.app_name;
- const auto &addr =
dsn::dns_resolver::instance().resolve_address(secondary);
+ const auto &addr = secondary.resolve();
SET_IP_AND_HOST_PORT(*request, target, addr, secondary);
_replica->_primary_states.get_replica_config(partition_status::PS_SECONDARY,
request->config);
diff --git a/src/replica/duplication/replica_follower.cpp
b/src/replica/duplication/replica_follower.cpp
index 9aa5628b1..fa5819a9c 100644
--- a/src/replica/duplication/replica_follower.cpp
+++ b/src/replica/duplication/replica_follower.cpp
@@ -33,7 +33,6 @@
#include "nfs/nfs_node.h"
#include "replica/replica.h"
#include "replica/replica_stub.h"
-#include "rpc/dns_resolver.h"
#include "rpc/group_host_port.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -128,7 +127,7 @@ void
replica_follower::async_duplicate_checkpoint_from_master_replica()
dsn::message_ex *msg = dsn::message_ex::create_request(
RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX, 0, get_gpid().thread_hash());
dsn::marshall(msg, meta_config_request);
- rpc::call(dsn::dns_resolver::instance().resolve_address(meta_servers),
+ rpc::call(meta_servers.resolve(),
msg,
&_tracker,
[&](error_code err, query_cfg_response &&resp) mutable {
diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp
index 5d8dc5adb..7c445a26c 100644
--- a/src/replica/replica_2pc.cpp
+++ b/src/replica/replica_2pc.cpp
@@ -57,7 +57,6 @@
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
#include "replica_stub.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
#include "rpc/rpc_stream.h"
@@ -506,7 +505,7 @@ void replica::send_prepare_message(const host_port &hp,
}
mu->remote_tasks()[hp] = rpc::call(
- dsn::dns_resolver::instance().resolve_address(hp),
+ hp.resolve(),
msg,
&_tracker,
[=](error_code err, dsn::message_ex *request, dsn::message_ex *reply) {
diff --git a/src/replica/replica_check.cpp b/src/replica/replica_check.cpp
index c4f4c01e1..c48eb09d7 100644
--- a/src/replica/replica_check.cpp
+++ b/src/replica/replica_check.cpp
@@ -47,7 +47,6 @@
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
#include "replica_stub.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "runtime/api_layer1.h"
@@ -130,7 +129,7 @@ void replica::broadcast_group_check()
std::shared_ptr<group_check_request> request(new group_check_request);
request->app = _app_info;
- const auto addr = dsn::dns_resolver::instance().resolve_address(hp);
+ const auto addr = hp.resolve();
SET_IP_AND_HOST_PORT(*request, node, addr, hp);
_primary_states.get_replica_config(it->second, request->config);
request->last_committed_decree = last_committed_decree();
diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp
index 6a4080f66..2d7b1232f 100644
--- a/src/replica/replica_config.cpp
+++ b/src/replica/replica_config.cpp
@@ -60,7 +60,6 @@
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
#include "replica_stub.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -246,10 +245,8 @@ void replica::add_potential_secondary(const
configuration_update_request &propos
FMT_HOST_PORT_AND_IP(proposal, node),
state.signature);
-
rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(node),
- RPC_LEARN_ADD_LEARNER,
- request,
- get_gpid().thread_hash());
+ rpc::call_one_way_typed(
+ node.resolve(), RPC_LEARN_ADD_LEARNER, request,
get_gpid().thread_hash());
}
void replica::upgrade_to_secondary_on_primary(const ::dsn::host_port &node)
@@ -417,8 +414,7 @@ void
replica::update_configuration_on_meta_server(config_type::type type,
enum_to_string(request->type),
FMT_HOST_PORT_AND_IP(*request, node));
- rpc_address target(
-
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers()));
+ rpc_address target(_stub->_failure_detector->get_servers().resolve());
_primary_states.reconfiguration_task = rpc::call(
target,
msg,
@@ -459,8 +455,7 @@ void replica::on_update_configuration_on_meta_server_reply(
LPC_DELAY_UPDATE_CONFIG,
&_tracker,
[this, request, req2 = std::move(req)]() {
- rpc_address
target(dsn::dns_resolver::instance().resolve_address(
- _stub->_failure_detector->get_servers()));
+ rpc_address
target(_stub->_failure_detector->get_servers().resolve());
rpc_response_task_ptr t = rpc::create_rpc_response_task(
request,
&_tracker,
@@ -517,10 +512,8 @@ void replica::on_update_configuration_on_meta_server_reply(
CHECK_NE(req->node, _stub->primary_address());
replica_configuration rconfig;
replica_helper::get_replica_config(resp.config, node, rconfig);
-
rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(node),
- RPC_REMOVE_REPLICA,
- rconfig,
- get_gpid().thread_hash());
+ rpc::call_one_way_typed(
+ node.resolve(), RPC_REMOVE_REPLICA, rconfig,
get_gpid().thread_hash());
}
break;
}
diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp
index cbb886608..5c8cadac8 100644
--- a/src/replica/replica_learn.cpp
+++ b/src/replica/replica_learn.cpp
@@ -55,7 +55,6 @@
#include "replica/replica_context.h"
#include "replica/replication_app_base.h"
#include "replica_stub.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
@@ -259,9 +258,10 @@ void replica::init_learn(uint64_t signature)
host_port primary;
GET_HOST_PORT(_config, primary, primary);
_potential_secondary_states.learning_task = rpc::call(
- dsn::dns_resolver::instance().resolve_address(primary),
+ primary.resolve(),
msg,
&_tracker,
+ // NOLINTNEXTLINE(hicpp-move-const-arg,performance-move-const-arg)
[this, req_cap = std::move(request)](error_code err, learn_response
&&resp) mutable {
on_learn_reply(err, std::move(req_cap), std::move(resp));
});
@@ -1307,9 +1307,10 @@ void replica::notify_learn_completion()
host_port primary;
GET_HOST_PORT(_config, primary, primary);
_potential_secondary_states.completion_notify_task = rpc::call(
- dsn::dns_resolver::instance().resolve_address(primary),
+ primary.resolve(),
msg,
&_tracker,
+ // NOLINTNEXTLINE(hicpp-move-const-arg,performance-move-const-arg)
[this, report = std::move(report)](error_code err,
learn_notify_response &&resp) mutable {
on_learn_completion_notification_reply(err, std::move(report),
std::move(resp));
});
diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp
index 00b260dc0..85755646b 100644
--- a/src/replica/replica_restore.cpp
+++ b/src/replica/replica_restore.cpp
@@ -39,7 +39,7 @@
#include "metadata_types.h"
#include "replica.h"
#include "replica_stub.h"
-#include "rpc/dns_resolver.h"
+#include "rpc/rpc_host_port.h"
#include "rpc/rpc_message.h"
#include "rpc/serialization.h"
#include "task/async_calls.h"
@@ -404,8 +404,7 @@ void replica::tell_meta_to_restore_rollback()
dsn::message_ex *msg = dsn::message_ex::create_request(RPC_CM_DROP_APP);
::dsn::marshall(msg, request);
- const auto &target =
-
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers());
+ const auto &target = _stub->_failure_detector->get_servers().resolve();
rpc::call(target,
msg,
&_tracker,
@@ -434,8 +433,7 @@ void replica::report_restore_status_to_meta()
dsn::message_ex *msg =
dsn::message_ex::create_request(RPC_CM_REPORT_RESTORE_STATUS);
::dsn::marshall(msg, request);
- const auto &target =
-
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers());
+ const auto &target = _stub->_failure_detector->get_servers().resolve();
rpc::call(target,
msg,
&_tracker,
diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp
index 7f0b6cfed..efb746d21 100644
--- a/src/replica/replica_stub.cpp
+++ b/src/replica/replica_stub.cpp
@@ -1550,8 +1550,7 @@ void replica_stub::query_configuration_by_node()
LOG_INFO("send query node partitions request to meta server,
stored_replicas_count = {}",
req.stored_replicas.size());
- const auto &target =
-
dsn::dns_resolver::instance().resolve_address(_failure_detector->get_servers());
+ const auto &target = _failure_detector->get_servers().resolve();
_config_query_task =
rpc::call(target,
msg,
@@ -1757,16 +1756,14 @@ void replica_stub::remove_replica_on_meta_server(const
app_info &info,
GET_HOST_PORT(pc, primary, primary);
if (_primary_host_port == primary) {
RESET_IP_AND_HOST_PORT(request->config, primary);
- } else if (REMOVE_IP_AND_HOST_PORT(
+ } else if (!REMOVE_IP_AND_HOST_PORT(
primary_address(), _primary_host_port, request->config,
secondaries)) {
- } else {
return;
}
::dsn::marshall(msg, *request);
- const auto &target =
-
dsn::dns_resolver::instance().resolve_address(_failure_detector->get_servers());
+ const auto &target = _failure_detector->get_servers().resolve();
rpc::call(target, msg, nullptr, [](error_code err, dsn::message_ex *,
dsn::message_ex *) {});
}
@@ -1775,8 +1772,9 @@ void replica_stub::on_meta_server_disconnected()
LOG_INFO("meta server disconnected");
zauto_lock sl(_state_lock);
- if (NS_Disconnected == _state)
+ if (NS_Disconnected == _state) {
return;
+ }
_state = NS_Disconnected;
diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h
index 10f831949..e3d7b9706 100644
--- a/src/replica/replica_stub.h
+++ b/src/replica/replica_stub.h
@@ -56,7 +56,6 @@
#include "replica.h"
#include "replica/mutation_log.h"
#include "replica_admin_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_holder.h"
#include "rpc/rpc_host_port.h"
@@ -219,12 +218,9 @@ public:
bool is_connected() const { return NS_Connected == _state; }
virtual rpc_address get_meta_server_address() const
{
- return
dsn::dns_resolver::instance().resolve_address(_failure_detector->get_servers());
- }
- rpc_address primary_address() const
- {
- return
dsn::dns_resolver::instance().resolve_address(_primary_host_port);
+ return _failure_detector->get_servers().resolve();
}
+ rpc_address primary_address() const { return _primary_host_port.resolve();
}
const host_port &primary_host_port() const { return _primary_host_port; }
//
diff --git a/src/replica/split/replica_split_manager.cpp
b/src/replica/split/replica_split_manager.cpp
index 4afd5df97..f7c9a3190 100644
--- a/src/replica/split/replica_split_manager.cpp
+++ b/src/replica/split/replica_split_manager.cpp
@@ -35,7 +35,6 @@
#include "replica/replica_context.h"
#include "replica/replica_stub.h"
#include "replica/replication_app_base.h"
-#include "rpc/dns_resolver.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_holder.h"
#include "rpc/rpc_host_port.h"
@@ -632,34 +631,31 @@ void replica_split_manager::child_notify_catch_up() // on
child partition
_replica->_split_states.parent_gpid.thread_hash());
host_port primary;
GET_HOST_PORT(_replica->_config, primary, primary);
- rpc.call(dsn::dns_resolver::instance().resolve_address(primary),
- tracker(),
- [this, rpc](error_code ec) mutable {
- auto response = rpc.response();
- if (ec == ERR_TIMEOUT) {
- LOG_WARNING_PREFIX("notify primary catch up timeout,
please wait and retry");
- tasking::enqueue(
- LPC_PARTITION_SPLIT,
- tracker(),
-
std::bind(&replica_split_manager::child_notify_catch_up, this),
- get_gpid().thread_hash(),
- std::chrono::seconds(1));
- return;
- }
- if (ec != ERR_OK || response.err != ERR_OK) {
- error_code err = (ec == ERR_OK) ? response.err : ec;
- LOG_ERROR_PREFIX("failed to notify primary catch up,
error={}", err);
- _stub->split_replica_error_handler(
- _replica->_split_states.parent_gpid,
-
std::bind(&replica_split_manager::parent_cleanup_split_context,
- std::placeholders::_1));
- child_handle_split_error("notify_primary_split_catch_up
failed");
- return;
- }
- LOG_INFO_PREFIX("notify primary parent[{}@{}] catch up
succeed",
- _replica->_split_states.parent_gpid,
- FMT_HOST_PORT_AND_IP(_replica->_config,
primary));
- });
+ rpc.call(primary.resolve(), tracker(), [this, rpc](error_code ec) mutable {
+ auto response = rpc.response();
+ if (ec == ERR_TIMEOUT) {
+ LOG_WARNING_PREFIX("notify primary catch up timeout, please wait
and retry");
+ tasking::enqueue(LPC_PARTITION_SPLIT,
+ tracker(),
+
std::bind(&replica_split_manager::child_notify_catch_up, this),
+ get_gpid().thread_hash(),
+ std::chrono::seconds(1));
+ return;
+ }
+ if (ec != ERR_OK || response.err != ERR_OK) {
+ error_code err = (ec == ERR_OK) ? response.err : ec;
+ LOG_ERROR_PREFIX("failed to notify primary catch up, error={}",
err);
+ _stub->split_replica_error_handler(
+ _replica->_split_states.parent_gpid,
+ std::bind(&replica_split_manager::parent_cleanup_split_context,
+ std::placeholders::_1));
+ child_handle_split_error("notify_primary_split_catch_up failed");
+ return;
+ }
+ LOG_INFO_PREFIX("notify primary parent[{}@{}] catch up succeed",
+ _replica->_split_states.parent_gpid,
+ FMT_HOST_PORT_AND_IP(_replica->_config, primary));
+ });
}
// ThreadPool: THREAD_POOL_REPLICATION
@@ -808,8 +804,12 @@ void
replica_split_manager::parent_send_update_partition_count_request(
CHECK_EQ_PREFIX(status(), partition_status::PS_PRIMARY);
auto request =
std::make_unique<update_child_group_partition_count_request>();
- request->new_partition_count = new_partition_count;
+
SET_IP_AND_HOST_PORT_BY_DNS(*request, target, hp);
+ DCHECK(request->hp_target, "");
+ DCHECK_EQ(request->target, request->hp_target.resolve());
+
+ request->new_partition_count = new_partition_count;
request->child_pid = _child_gpid;
request->ballot = get_ballot();
@@ -817,14 +817,14 @@ void
replica_split_manager::parent_send_update_partition_count_request(
"send update child group partition count request to node({}), new
partition_count = {}",
hp,
new_partition_count);
+
+ const auto target = request->target;
update_child_group_partition_count_rpc rpc(std::move(request),
RPC_SPLIT_UPDATE_CHILD_PARTITION_COUNT,
0_ms,
0,
get_gpid().thread_hash());
- DCHECK(request->hp_target, "");
- DCHECK_EQ(request->target,
dsn::dns_resolver::instance().resolve_address(request->hp_target));
- rpc.call(request->target, tracker(), [this, rpc,
not_replied_addresses](error_code ec) mutable {
+ rpc.call(target, tracker(), [this, rpc, not_replied_addresses](error_code
ec) mutable {
on_update_child_group_partition_count_reply(
ec, rpc.request(), rpc.response(), not_replied_addresses);
});
@@ -920,7 +920,7 @@ void
replica_split_manager::on_update_child_group_partition_count_reply(
FMT_HOST_PORT_AND_IP(request, target),
error);
DCHECK(request.hp_target, "");
- DCHECK_EQ(request.target,
dsn::dns_resolver::instance().resolve_address(request.hp_target));
+ DCHECK_EQ(request.target, request.hp_target.resolve());
tasking::enqueue(
LPC_PARTITION_SPLIT,
tracker(),
@@ -1022,8 +1022,7 @@ void replica_split_manager::parent_send_register_request(
request.parent_config.ballot,
request.child_config.ballot);
- rpc_address meta_address(
-
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers()));
+ rpc_address
meta_address(_stub->_failure_detector->get_servers().resolve());
std::unique_ptr<register_child_request> req =
std::make_unique<register_child_request>(request);
register_child_rpc rpc(std::move(req),
RPC_CM_REGISTER_CHILD_REPLICA,
@@ -1518,8 +1517,7 @@ void
replica_split_manager::parent_send_notify_stop_request(
split_status::type meta_split_status) // on primary parent
{
FAIL_POINT_INJECT_F("replica_parent_send_notify_stop_request",
[](std::string_view) {});
- auto meta_address =
-
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers());
+ auto meta_address = _stub->_failure_detector->get_servers().resolve();
std::unique_ptr<notify_stop_split_request> req =
std::make_unique<notify_stop_split_request>();
req->app_name = _replica->_app_info.app_name;
req->parent_gpid = get_gpid();
@@ -1550,8 +1548,7 @@ void replica_split_manager::query_child_state() // on
primary parent
request->pid = get_gpid();
request->partition_count = _replica->_app_info.partition_count;
- rpc_address meta_address(
-
dsn::dns_resolver::instance().resolve_address(_stub->_failure_detector->get_servers()));
+ rpc_address
meta_address(_stub->_failure_detector->get_servers().resolve());
LOG_INFO_PREFIX("send query child partition state request to meta
server({})", meta_address);
query_child_state_rpc rpc(
std::move(request), RPC_CM_QUERY_CHILD_STATE, 0_ms, 0,
get_gpid().thread_hash());
diff --git a/src/replica/storage/simple_kv/test/client.cpp
b/src/replica/storage/simple_kv/test/client.cpp
index 23b20c6b4..3dd8e4b22 100644
--- a/src/replica/storage/simple_kv/test/client.cpp
+++ b/src/replica/storage/simple_kv/test/client.cpp
@@ -37,7 +37,6 @@
#include "common/replication_other_types.h"
#include "replica/storage/simple_kv/simple_kv.client.h"
#include "replica/storage/simple_kv/test/common.h"
-#include "rpc/dns_resolver.h"
#include "rpc/group_host_port.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_message.h"
@@ -167,27 +166,16 @@ void simple_kv_client_app::send_config_to_meta(const
host_port &receiver,
dsn::marshall(req, request);
-
dsn_rpc_call_one_way(dsn::dns_resolver::instance().resolve_address(_meta_server_group),
req);
+ dsn_rpc_call_one_way(_meta_server_group.resolve(), req);
}
-struct read_context
-{
- int id;
- std::string key;
- int timeout_ms;
-};
-
void simple_kv_client_app::begin_read(int id, const std::string &key, int
timeout_ms)
{
LOG_INFO("=== on_begin_read:id={},key={},timeout={}", id, key, timeout_ms);
- std::shared_ptr<read_context> ctx(new read_context());
- ctx->id = id;
- ctx->key = key;
- ctx->timeout_ms = timeout_ms;
_simple_kv_client->read(
key,
- [ctx](error_code err, std::string &&resp) {
- test_case::instance().on_end_read(ctx->id, err, resp);
+ [id](error_code err, std::string &&resp) {
+ test_case::instance().on_end_read(id, err, resp);
},
std::chrono::milliseconds(timeout_ms));
}
diff --git a/src/rpc/dns_resolver.cpp b/src/rpc/dns_resolver.cpp
index 9ddc10a7f..3c9e47992 100644
--- a/src/rpc/dns_resolver.cpp
+++ b/src/rpc/dns_resolver.cpp
@@ -27,7 +27,6 @@
#include "fmt/format.h"
#include "rpc/dns_resolver.h"
#include "rpc/group_address.h"
-#include "rpc/group_host_port.h"
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
@@ -57,7 +56,7 @@ dns_resolver::dns_resolver()
{
#ifndef MOCK_TEST
static int only_one_instance = 0;
- only_one_instance++;
+ ++only_one_instance;
CHECK_EQ_MSG(1, only_one_instance, "dns_resolver should only created
once!");
#endif
}
@@ -108,23 +107,27 @@ error_s dns_resolver::resolve_addresses(const host_port
&hp, std::vector<rpc_add
return error_s::ok();
}
+// NOLINTNEXTLINE(misc-no-recursion)
+rpc_address dns_resolver::resolve_address(const rpc_group_host_port &group)
+{
+ rpc_address addr;
+ addr.assign_group(group.name());
+
+ for (const auto &member : group.members()) {
+ CHECK_TRUE(addr.group_address()->add(resolve_address(member)));
+ }
+
addr.group_address()->set_update_leader_automatically(group.is_update_leader_automatically());
+ addr.group_address()->set_leader(resolve_address(group.leader()));
+ return addr;
+}
+
+// NOLINTNEXTLINE(misc-no-recursion)
rpc_address dns_resolver::resolve_address(const host_port &hp)
{
METRIC_VAR_AUTO_LATENCY(dns_resolver_resolve_duration_ns);
switch (hp.type()) {
- case HOST_TYPE_GROUP: {
- rpc_address addr;
- auto hp_group = hp.group_host_port();
- addr.assign_group(hp_group->name());
-
- for (const auto &hp : hp_group->members()) {
- CHECK_TRUE(addr.group_address()->add(resolve_address(hp)));
- }
- addr.group_address()->set_update_leader_automatically(
- hp_group->is_update_leader_automatically());
- addr.group_address()->set_leader(resolve_address(hp_group->leader()));
- return addr;
- }
+ case HOST_TYPE_GROUP:
+ return resolve_address(*hp.group_host_port());
case HOST_TYPE_IPV4: {
std::vector<rpc_address> addresses;
CHECK_OK(resolve_addresses(hp, addresses), "host_port '{}' can not be
resolved", hp);
@@ -155,7 +158,7 @@ std::string dns_resolver::ip_ports_from_host_ports(const
std::string &host_ports
std::vector<std::string> ip_port_vec;
ip_port_vec.reserve(host_port_vec.size());
for (const auto &hp : host_port_vec) {
- const auto addr =
dsn::dns_resolver::instance().resolve_address(host_port::from_string(hp));
+ const auto addr = host_port::from_string(hp).resolve();
ip_port_vec.emplace_back(addr.to_string());
}
diff --git a/src/rpc/dns_resolver.h b/src/rpc/dns_resolver.h
index e80640c13..25a06ec3b 100644
--- a/src/rpc/dns_resolver.h
+++ b/src/rpc/dns_resolver.h
@@ -23,10 +23,12 @@
#include <unordered_map>
#include <vector>
+#include "rpc/group_host_port.h"
#include "rpc/rpc_address.h"
#include "rpc/rpc_host_port.h"
#include "utils/errors.h"
#include "utils/metrics.h"
+#include "utils/ports.h"
#include "utils/singleton.h"
#include "utils/synchronize.h"
@@ -42,9 +44,6 @@ namespace dsn {
class dns_resolver : public utils::singleton<dns_resolver>
{
public:
- // Resolve this host_port to an unique rpc_address.
- rpc_address resolve_address(const host_port &hp);
-
// Resolve comma separated host:port list 'host_ports' to comma separated
ip:port list.
static std::string ip_ports_from_host_ports(const std::string &host_ports);
@@ -53,6 +52,13 @@ private:
~dns_resolver() = default;
friend class utils::singleton<dns_resolver>;
+ friend class host_port;
+
+ // Resolve the host_port object 'hp' into an rpc_address(i.e. a group or
an ip).
+ rpc_address resolve_address(const host_port &hp);
+
+ // Resolve the host_port group object into an rpc_address(i.e. a group).
+ rpc_address resolve_address(const rpc_group_host_port &group);
bool get_cached_addresses(const host_port &hp, std::vector<rpc_address>
&addresses);
@@ -66,6 +72,9 @@ private:
METRIC_VAR_DECLARE_gauge_int64(dns_resolver_cache_size);
METRIC_VAR_DECLARE_percentile_int64(dns_resolver_resolve_duration_ns);
METRIC_VAR_DECLARE_percentile_int64(dns_resolver_resolve_by_dns_duration_ns);
+
+ DISALLOW_COPY_AND_ASSIGN(dns_resolver);
+ DISALLOW_MOVE_AND_ASSIGN(dns_resolver);
};
} // namespace dsn
diff --git a/src/rpc/rpc_host_port.cpp b/src/rpc/rpc_host_port.cpp
index e0b35e63c..0d4201518 100644
--- a/src/rpc/rpc_host_port.cpp
+++ b/src/rpc/rpc_host_port.cpp
@@ -162,13 +162,15 @@ void host_port::assign_group(const char *name)
_group_host_port = std::make_shared<rpc_group_host_port>(name);
}
+rpc_address host_port::resolve() const { return
dns_resolver::instance().resolve_address(*this); }
+
std::string host_port::resolve(bool resolve_ip) const
{
if (!resolve_ip) {
return to_string();
}
- return dns_resolver::instance().resolve_address(*this).to_string();
+ return resolve().to_string();
}
error_s host_port::resolve_addresses(std::vector<rpc_address> &addresses) const
diff --git a/src/rpc/rpc_host_port.h b/src/rpc/rpc_host_port.h
index a0ba48a9f..f3e38cdd6 100644
--- a/src/rpc/rpc_host_port.h
+++ b/src/rpc/rpc_host_port.h
@@ -52,7 +52,7 @@ class TProtocol;
auto &_target = (target);
\
if (_obj.__isset.hp_##field) {
\
DCHECK(_obj.field, "invalid address: {}", _obj.field);
\
- DCHECK_EQ(_obj.field,
dsn::dns_resolver::instance().resolve_address(_obj.hp_##field)); \
+ DCHECK_EQ(_obj.field, _obj.hp_##field.resolve());
\
_target = _obj.hp_##field;
\
} else {
\
_target = std::move(dsn::host_port::from_address(_obj.field));
\
@@ -85,7 +85,7 @@ class TProtocol;
auto &_obj = (obj);
\
const auto &_addr = (addr);
\
const auto &_hp = (hp);
\
- DCHECK_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
+ DCHECK_EQ(_addr, _hp.resolve());
\
_obj.field = _addr;
\
_obj.__set_hp_##field(_hp);
\
} while (0)
@@ -97,7 +97,7 @@ class TProtocol;
const auto &_obj = (obj);
\
const auto &_addr = (addr);
\
const auto &_hp = (hp);
\
- ASSERT_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
+ ASSERT_EQ(_addr, _hp.resolve());
\
ASSERT_EQ(_addr, _obj.field);
\
ASSERT_EQ(_hp, _obj.hp_##field);
\
} while (0)
@@ -109,8 +109,7 @@ class TProtocol;
do {
\
const auto &_src_obj = (src_obj);
\
auto &_dst_obj = (dst_obj);
\
- DCHECK_EQ(_src_obj.src_field,
\
-
dsn::dns_resolver::instance().resolve_address(_src_obj.hp_##src_field));
\
+ DCHECK_EQ(_src_obj.src_field, _src_obj.hp_##src_field.resolve());
\
_dst_obj.dst_field = _src_obj.src_field;
\
_dst_obj.__set_hp_##dst_field(_src_obj.hp_##src_field);
\
} while (0)
@@ -121,7 +120,7 @@ class TProtocol;
do {
\
auto &_obj = (obj);
\
const auto &_hp = (hp);
\
- _obj.field = dsn::dns_resolver::instance().resolve_address(_hp);
\
+ _obj.field = _hp.resolve();
\
_obj.__set_hp_##field(_hp);
\
} while (0)
@@ -149,7 +148,7 @@ class TProtocol;
do {
\
const auto &_addr = (addr);
\
const auto &_hp = (hp);
\
- DCHECK_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
+ DCHECK_EQ(_addr, _hp.resolve());
\
auto &_obj = (obj);
\
_obj.field.push_back(_addr);
\
if (!_obj.__isset.hp_##field) {
\
@@ -166,7 +165,7 @@ class TProtocol;
do {
\
auto &_obj = (obj);
\
const auto &_hp = (hp);
\
-
_obj.field.push_back(dsn::dns_resolver::instance().resolve_address(_hp));
\
+ _obj.field.push_back(_hp.resolve());
\
if (!_obj.__isset.hp_##field) {
\
_obj.__set_hp_##field({_hp});
\
} else {
\
@@ -179,7 +178,7 @@ class TProtocol;
do {
\
auto &_obj = (obj);
\
const auto &_hp1 = (hp1);
\
- _obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1)};
\
+ _obj.field = {_hp1.resolve()};
\
_obj.__set_hp_##field({_hp1});
\
} while (0)
#define SET_IPS_AND_HOST_PORTS_BY_DNS_2(obj, field, hp1, hp2)
\
@@ -187,8 +186,7 @@ class TProtocol;
auto &_obj = (obj);
\
const auto &_hp1 = (hp1);
\
const auto &_hp2 = (hp2);
\
- _obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1),
\
- dsn::dns_resolver::instance().resolve_address(_hp2)};
\
+ _obj.field = {_hp1.resolve(), _hp2.resolve()};
\
_obj.__set_hp_##field({_hp1, _hp2});
\
} while (0)
#define SET_IPS_AND_HOST_PORTS_BY_DNS_3(obj, field, hp1, hp2, hp3)
\
@@ -197,9 +195,7 @@ class TProtocol;
const auto &_hp1 = (hp1);
\
const auto &_hp2 = (hp2);
\
const auto &_hp3 = (hp3);
\
- _obj.field = {dsn::dns_resolver::instance().resolve_address(_hp1),
\
- dsn::dns_resolver::instance().resolve_address(_hp2),
\
- dsn::dns_resolver::instance().resolve_address(_hp3)};
\
+ _obj.field = {_hp1.resolve(), _hp2.resolve(), _hp3.resolve()};
\
_obj.__set_hp_##field({_hp1, _hp2, _hp3});
\
} while (0)
#define SET_IPS_AND_HOST_PORTS_BY_DNS_GET_MACRO(hp1, hp2, hp3, NAME, ...) NAME
@@ -223,7 +219,7 @@ class TProtocol;
do {
\
auto &_obj = (obj);
\
const auto &_hp = (hp);
\
- _obj.field.insert(_obj.field.begin(),
dsn::dns_resolver::instance().resolve_address(_hp)); \
+ _obj.field.insert(_obj.field.begin(), _hp.resolve());
\
if (!_obj.__isset.hp_##field) {
\
_obj.__set_hp_##field({_hp});
\
} else {
\
@@ -247,7 +243,7 @@ class TProtocol;
do {
\
const auto &_hp = (hp);
\
const auto &_addr = (addr);
\
- DCHECK_EQ(_addr, dsn::dns_resolver::instance().resolve_address(_hp));
\
+ DCHECK_EQ(_addr, _hp.resolve());
\
auto &_obj = (obj);
\
const auto &_value = (value);
\
_obj.field[_addr] = _value;
\
@@ -264,7 +260,7 @@ class TProtocol;
#define SET_VALUE_FROM_HOST_PORT(obj, field, hp, value)
\
do {
\
const auto &__hp = (hp);
\
- const auto addr = dsn::dns_resolver::instance().resolve_address(__hp);
\
+ const auto addr = __hp.resolve();
\
SET_VALUE_FROM_IP_AND_HOST_PORT(obj, field, addr, __hp, value);
\
} while (0)
@@ -305,6 +301,9 @@ public:
}
void assign_group(const char *name);
+ // Resolve this host_port object to ip:port.
+ [[nodiscard]] rpc_address resolve() const;
+
// Resolve this host_port object to the string of ip:port if 'resolve_ip'
is true;
// otherwise, just return the string of this host_port object.
[[nodiscard]] std::string resolve(bool resolve_ip) const;
diff --git a/src/rpc/test/host_port_test.cpp b/src/rpc/test/host_port_test.cpp
index 8fdb7a4e2..35bff2770 100644
--- a/src/rpc/test/host_port_test.cpp
+++ b/src/rpc/test/host_port_test.cpp
@@ -31,7 +31,6 @@
#include "fd_types.h"
#include "gtest/gtest.h"
#include "meta_admin_types.h"
-#include "rpc/dns_resolver.h"
#include "rpc/group_address.h"
#include "rpc/group_host_port.h"
#include "rpc/rpc_address.h"
@@ -283,9 +282,9 @@ TEST(host_port_test, test_macros)
static const host_port kHp2("localhost", 8082);
static const host_port kHp3("localhost", 8083);
static const std::vector<host_port> kHps({kHp1, kHp2, kHp3});
- static const rpc_address kAddr1 =
dns_resolver::instance().resolve_address(kHp1);
- static const rpc_address kAddr2 =
dns_resolver::instance().resolve_address(kHp2);
- static const rpc_address kAddr3 =
dns_resolver::instance().resolve_address(kHp3);
+ static const rpc_address kAddr1 = kHp1.resolve();
+ static const rpc_address kAddr2 = kHp2.resolve();
+ static const rpc_address kAddr3 = kHp3.resolve();
static const std::vector<rpc_address> kAddres({kAddr1, kAddr2, kAddr3});
// Test GET_HOST_PORT-1.
@@ -303,7 +302,7 @@ TEST(host_port_test, test_macros)
GET_HOST_PORT(beacon, from_node, hp_from_node);
ASSERT_TRUE(hp_from_node);
ASSERT_EQ(kHp1, hp_from_node);
- ASSERT_EQ(kAddr1,
dns_resolver::instance().resolve_address(hp_from_node));
+ ASSERT_EQ(kAddr1, hp_from_node.resolve());
}
// Test GET_HOST_PORT-3.
{
@@ -314,7 +313,7 @@ TEST(host_port_test, test_macros)
GET_HOST_PORT(beacon, from_node, hp_from_node);
ASSERT_TRUE(hp_from_node);
ASSERT_EQ(kHp1, hp_from_node);
- ASSERT_EQ(kAddr1,
dns_resolver::instance().resolve_address(hp_from_node));
+ ASSERT_EQ(kAddr1, hp_from_node.resolve());
}
// Test GET_HOST_PORTS-1.
diff --git a/src/runtime/test/dns_resolver_test.cpp
b/src/runtime/test/dns_resolver_test.cpp
index 4e4ec57b2..0d805ebf8 100644
--- a/src/runtime/test/dns_resolver_test.cpp
+++ b/src/runtime/test/dns_resolver_test.cpp
@@ -34,7 +34,7 @@ TEST(host_port_test, dns_resolver)
// Resolve HOST_TYPE_IPV4 type host_port.
{
host_port hp("localhost", 8080);
- const auto &addr = dns_resolver::instance().resolve_address(hp);
+ const auto &addr = hp.resolve();
ASSERT_TRUE(rpc_address::from_ip_port("127.0.0.1", 8080) == addr ||
rpc_address::from_ip_port("127.0.1.1", 8080) == addr);
}
@@ -50,7 +50,7 @@ TEST(host_port_test, dns_resolver)
host_port hp2("localhost", 8081);
g_hp->set_leader(hp2);
- const auto &addr_grp =
dns_resolver::instance().resolve_address(hp_grp);
+ const auto &addr_grp = hp_grp.resolve();
const auto *const g_addr = addr_grp.group_address();
ASSERT_EQ(g_addr->is_update_leader_automatically(),
g_hp->is_update_leader_automatically());
diff --git a/src/runtime/test_utils.h b/src/runtime/test_utils.h
index 29df59b28..e5c2d9110 100644
--- a/src/runtime/test_utils.h
+++ b/src/runtime/test_utils.h
@@ -98,7 +98,7 @@ public:
if (next_hp.port() != TEST_PORT_END) {
next_hp = dsn::host_port(next_hp.host(), next_hp.port() + 1);
LOG_INFO("test_client_server, talk_to_others: {}", next_hp);
- dsn_rpc_forward(message,
dsn::dns_resolver::instance().resolve_address(next_hp));
+ dsn_rpc_forward(message, next_hp.resolve());
} else {
LOG_INFO("test_client_server, talk_to_me: {}", next_hp);
reply(message, next_hp.to_string());
diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h
index 1d6065717..7f88ad079 100644
--- a/src/shell/command_helper.h
+++ b/src/shell/command_helper.h
@@ -1292,11 +1292,7 @@ call_remote_command(shell_context *sc,
}
};
tasks[i] = dsn::dist::cmd::async_call_remote(
- dsn::dns_resolver::instance().resolve_address(nodes[i].hp),
- cmd,
- arguments,
- callback,
- std::chrono::milliseconds(5000));
+ nodes[i].hp.resolve(), cmd, arguments, callback,
std::chrono::milliseconds(5000));
}
for (int i = 0; i < nodes.size(); ++i) {
tasks[i]->wait();
@@ -2161,10 +2157,10 @@ inline bool get_capacity_unit_stat(shell_context *sc,
continue;
}
nodes_stat[i].timestamp = info.timestamp_str;
- nodes_stat[i].node_address =
-
dsn::dns_resolver::instance().resolve_address(nodes[i].hp).to_string();
+ nodes_stat[i].node_address = nodes[i].hp.resolve().to_string();
for (dsn::perf_counter_metric &m : info.counters) {
- int32_t app_id, pidx;
+ int32_t app_id{0};
+ int32_t pidx{0};
std::string counter_name;
bool r = parse_app_pegasus_perf_counter_name(m.name, app_id, pidx,
counter_name);
CHECK(r, "name = {}", m.name);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]