This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7851563829b [fix](brpc_client_cache) resolve hostname in DNS cache
before passing to brpc (#40074) (#40786)
7851563829b is described below
commit 7851563829b6c5f6f78fbb4f61ff03459d45fefc
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Sep 13 14:28:01 2024 +0800
[fix](brpc_client_cache) resolve hostname in DNS cache before passing to
brpc (#40074) (#40786)
backport #40074
---
be/src/exec/rowid_fetcher.cpp | 3 ++-
be/src/exprs/runtime_filter.cpp | 7 +++----
be/src/olap/single_replica_compaction.cpp | 2 +-
be/src/runtime/runtime_filter_mgr.cpp | 12 +++++++++--
be/src/service/internal_service.cpp | 5 +++++
be/src/util/brpc_client_cache.h | 35 ++++++++++++++++++++++---------
be/src/util/proto_util.h | 15 +++++++++++++
be/src/vec/functions/function_rpc.cpp | 5 +++++
be/src/vec/sink/load_stream_stub.cpp | 10 ++++++---
be/src/vec/sink/writer/vtablet_writer.cpp | 21 ++++++++++++++++++-
10 files changed, 93 insertions(+), 22 deletions(-)
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index c9aa200e1d6..b51e263d86b 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -71,7 +71,8 @@ Status RowIDFetcher::init() {
if (!client) {
LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
<< ", port=" << node_info.brpc_port;
- return Status::InternalError("RowIDFetcher failed to init rpc
client");
+ return Status::InternalError("RowIDFetcher failed to init rpc
client, host={}, port={}",
+ node_info.host, node_info.brpc_port);
}
_stubs.push_back(client);
}
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 1c0cdffc0f5..4e9a12b5bc5 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1100,9 +1100,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState*
state, uint64_t local_filt
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(addr));
if (!stub) {
- std::string msg =
- fmt::format("Get rpc stub failed, host={}, port=",
addr.hostname, addr.port);
- return Status::InternalError(msg);
+ return Status::InternalError("Get rpc stub failed, host={}, port={}",
addr.hostname,
+ addr.port);
}
auto request = std::make_shared<PSendFilterSizeRequest>();
@@ -1135,7 +1134,7 @@ Status IRuntimeFilter::push_to_remote(const
TNetworkAddress* addr, bool opt_remo
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
if (!stub) {
return Status::InternalError(
- fmt::format("Get rpc stub failed, host={}, port=",
addr->hostname, addr->port));
+ fmt::format("Get rpc stub failed, host={}, port={}",
addr->hostname, addr->port));
}
auto merge_filter_request = std::make_shared<PMergeFilterRequest>();
diff --git a/be/src/olap/single_replica_compaction.cpp
b/be/src/olap/single_replica_compaction.cpp
index 393bfb99f7b..8257a8182a0 100644
--- a/be/src/olap/single_replica_compaction.cpp
+++ b/be/src/olap/single_replica_compaction.cpp
@@ -175,7 +175,7 @@ Status
SingleReplicaCompaction::_get_rowset_verisons_from_peer(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr.host,
addr.brpc_port);
if (stub == nullptr) {
- return Status::Aborted("get rpc stub failed");
+ return Status::Aborted("get rpc stub failed, host={}, port={}",
addr.host, addr.brpc_port);
}
brpc::Controller cntl;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 5b61cc87361..24baf9b6c97 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -342,10 +342,17 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
cnt_val->global_size += request->filter_size();
cnt_val->source_addrs.push_back(request->source_addr());
+ Status st = Status::OK();
if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
for (auto addr : cnt_val->source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
+ if (stub == nullptr) {
+ LOG(WARNING) << "Failed to init rpc to " << addr.hostname() <<
":" << addr.port();
+ st = Status::InternalError("Failed to init rpc to {}:{}",
addr.hostname(),
+ addr.port());
+ continue;
+ }
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
@@ -365,7 +372,7 @@ Status
RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
closure.release();
}
}
- return Status::OK();
+ return st;
}
Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest*
request) {
@@ -395,6 +402,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
int64_t start_merge = MonotonicMillis();
auto filter_id = request->filter_id();
std::map<int, CntlValwithLock>::iterator iter;
+ Status st = Status::OK();
{
std::shared_lock<std::shared_mutex> guard(_filter_map_mutex);
iter = _filter_map.find(filter_id);
@@ -587,7 +595,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const
PMergeFilterRequest* requ
}
}
}
- return Status::OK();
+ return st;
}
Status RuntimeFilterMergeController::acquire(
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 0801f30fb2e..13faed18e61 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1110,6 +1110,11 @@ void
PInternalServiceImpl::fetch_remote_tablet_schema(google::protobuf::RpcContr
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
host, brpc_port));
+ if (stub == nullptr) {
+ LOG(WARNING) << "Failed to init rpc to " << host << ":" <<
brpc_port;
+ st = Status::InternalError("Failed to init rpc to {}:{}",
host, brpc_port);
+ continue;
+ }
rpc_contexts[i].cid = rpc_contexts[i].cntl.call_id();
rpc_contexts[i].cntl.set_timeout_ms(config::fetch_remote_schema_rpc_timeout_ms);
stub->fetch_remote_tablet_schema(&rpc_contexts[i].cntl,
&remote_request,
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 09c92fb398e..24bd284f302 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -83,33 +83,47 @@ public:
}
std::shared_ptr<T> get_client(const std::string& host, int port) {
- std::string realhost;
- realhost = host;
- if (!is_valid_ip(host)) {
- Status status = ExecEnv::GetInstance()->dns_cache()->get(host,
&realhost);
+ std::string realhost = host;
+ auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+ if (dns_cache == nullptr) {
+ LOG(WARNING) << "DNS cache is not initialized, skipping hostname
resolve";
+ } else if (!is_valid_ip(host)) {
+ Status status = dns_cache->get(host, &realhost);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host:" <<
status.to_string();
return nullptr;
}
}
std::string host_port = get_host_port(realhost, port);
- return get_client(host_port);
- }
-
- std::shared_ptr<T> get_client(const std::string& host_port) {
std::shared_ptr<T> stub_ptr;
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
+ DCHECK(stub_ptr != nullptr);
return stub_ptr;
}
// new one stub and insert into map
auto stub = get_new_client_no_cache(host_port);
- _stub_map.try_emplace_l(
- host_port, [&stub](const auto& v) { stub = v.second; }, stub);
+ if (stub != nullptr) {
+ _stub_map.try_emplace_l(
+ host_port, [&stub](const auto& v) { stub = v.second; },
stub);
+ }
return stub;
}
+ std::shared_ptr<T> get_client(const std::string& host_port) {
+ int pos = host_port.rfind(':');
+ std::string host = host_port.substr(0, pos);
+ int port = 0;
+ try {
+ port = stoi(host_port.substr(pos + 1));
+ } catch (const std::exception& err) {
+ LOG(WARNING) << "failed to parse port from " << host_port << ": "
<< err.what();
+ return nullptr;
+ }
+ return get_client(host, port);
+ }
+
std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol =
"",
const std::string&
connection_type = "",
@@ -143,6 +157,7 @@ public:
channel->Init(host_port.c_str(),
config::rpc_load_balancer.c_str(), &options);
}
if (ret_code) {
+ LOG(WARNING) << "Failed to initialize brpc Channel to " <<
host_port;
return nullptr;
}
return std::make_shared<T>(channel.release(),
google::protobuf::Service::STUB_OWNS_CHANNEL);
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index f77a1f637f3..c1994d1feeb 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -71,11 +71,26 @@ Status transmit_block_httpv2(ExecEnv* exec_env,
std::unique_ptr<Closure> closure
TNetworkAddress brpc_dest_addr) {
RETURN_IF_ERROR(request_embed_attachment_contain_blockv2(closure->request_.get(),
closure));
+ std::string host = brpc_dest_addr.hostname;
+ auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+ if (dns_cache == nullptr) {
+ LOG(WARNING) << "DNS cache is not initialized, skipping hostname
resolve";
+ } else if (!is_valid_ip(brpc_dest_addr.hostname)) {
+ Status status = dns_cache->get(brpc_dest_addr.hostname, &host);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get ip from host " <<
brpc_dest_addr.hostname << ": "
+ << status.to_string();
+ return Status::InternalError("failed to get ip from host {}",
brpc_dest_addr.hostname);
+ }
+ }
//format an ipv6 address
std::string brpc_url = get_brpc_http_url(brpc_dest_addr.hostname,
brpc_dest_addr.port);
std::shared_ptr<PBackendService_Stub> brpc_http_stub =
exec_env->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
"http");
+ if (brpc_http_stub == nullptr) {
+ return Status::InternalError("failed to open brpc http client to {}",
brpc_url);
+ }
closure->cntl_->http_request().uri() =
brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
closure->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
diff --git a/be/src/vec/functions/function_rpc.cpp
b/be/src/vec/functions/function_rpc.cpp
index ba171ffbbc9..c27383dac62 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -46,6 +46,11 @@ Status RPCFnImpl::vec_call(FunctionContext* context, Block&
block, const ColumnN
size_t result, size_t input_rows_count) {
PFunctionCallRequest request;
PFunctionCallResponse response;
+ if (_client == nullptr) {
+ return Status::InternalError(
+ "call to rpc function {} failed: init rpc error, server addr =
{}", _signature,
+ _server_addr);
+ }
request.set_function_name(_function_name);
RETURN_IF_ERROR(_convert_block_to_proto(block, arguments,
input_rows_count, &request));
brpc::Controller cntl;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index e899486e854..e29d64118b9 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -152,7 +152,6 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
return _init_st;
}
_dst_id = node_info.id;
- std::string host_port = get_host_port(node_info.host, node_info.brpc_port);
brpc::StreamOptions opt;
opt.max_buf_size = config::load_stream_max_buf_size;
opt.idle_timeout_ms = idle_timeout_ms;
@@ -185,7 +184,11 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming
connections
- const auto& stub = client_cache->get_client(host_port);
+ const auto& stub = client_cache->get_client(node_info.host,
node_info.brpc_port);
+ if (stub == nullptr) {
+ return Status::InternalError("failed to init brpc client to {}:{}",
node_info.host,
+ node_info.brpc_port);
+ }
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
@@ -200,7 +203,8 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
cntl.ErrorText());
return _init_st;
}
- LOG(INFO) << "open load stream to " << host_port << ", " << *this;
+ LOG(INFO) << "open load stream to host=" << node_info.host << ", port=" <<
node_info.brpc_port
+ << ", " << *this;
_is_init.store(true);
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 1e6b8f7b868..e946a73bfed 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -700,11 +700,30 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
return;
}
+ std::string host = _node_info.host;
+ auto dns_cache = ExecEnv::GetInstance()->dns_cache();
+ if (dns_cache == nullptr) {
+ LOG(WARNING) << "DNS cache is not initialized, skipping hostname
resolve";
+ } else if (!is_valid_ip(_node_info.host)) {
+ Status status = dns_cache->get(_node_info.host, &host);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get ip from host " <<
_node_info.host << ": "
+ << status.to_string();
+ _send_block_callback->clear_in_flight();
+ return;
+ }
+ }
//format an ipv6 address
- std::string brpc_url = get_brpc_http_url(_node_info.host,
_node_info.brpc_port);
+ std::string brpc_url = get_brpc_http_url(host, _node_info.brpc_port);
std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
_state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
"http");
+ if (_brpc_http_stub == nullptr) {
+ cancel(fmt::format("{}, failed to open brpc http client to {}",
channel_info(),
+ brpc_url));
+ _send_block_callback->clear_in_flight();
+ return;
+ }
_send_block_callback->cntl_->http_request().uri() =
brpc_url +
"/PInternalServiceImpl/tablet_writer_add_block_by_http";
_send_block_callback->cntl_->http_request().set_method(brpc::HTTP_METHOD_POST);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]