This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 19cd42c [BUG] avoid std::function copy in client cache (#6186)
19cd42c is described below
commit 19cd42ccbd08809fa74105277e4963c97113b8bc
Author: stdpain <[email protected]>
AuthorDate: Fri Jul 16 09:20:28 2021 +0800
[BUG] avoid std::function copy in client cache (#6186)
* [BUG] avoid std::function copy in client cache
* Refactor ClientFactory Name
---
be/src/runtime/client_cache.cpp | 56 +++++++++++++++++++---------------------
be/src/runtime/client_cache.h | 57 +++++++++++++++++++++--------------------
be/src/util/thrift_client.h | 9 +------
3 files changed, 57 insertions(+), 65 deletions(-)
diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index a46ef86..1d823ed 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -45,11 +45,11 @@ ClientCacheHelper::~ClientCacheHelper() {
}
}
-Status ClientCacheHelper::get_client(const TNetworkAddress& hostport,
client_factory factory_method,
+Status ClientCacheHelper::get_client(const TNetworkAddress& hostport,
ClientFactory& factory_method,
void** client_key, int timeout_ms) {
std::lock_guard<std::mutex> lock(_lock);
//VLOG_RPC << "get_client(" << hostport << ")";
- ClientCacheMap::iterator cache_entry = _client_cache.find(hostport);
+ auto cache_entry = _client_cache.find(hostport);
if (cache_entry == _client_cache.end()) {
cache_entry = _client_cache.insert(std::make_pair(hostport,
std::list<void*>())).first;
@@ -63,7 +63,7 @@ Status ClientCacheHelper::get_client(const TNetworkAddress&
hostport, client_fac
VLOG_RPC << "get_client(): cached client for " << hostport;
info_list.pop_front();
} else {
- RETURN_IF_ERROR(create_client(hostport, factory_method, client_key,
timeout_ms));
+ RETURN_IF_ERROR(_create_client(hostport, factory_method, client_key,
timeout_ms));
}
_client_map[*client_key]->set_send_timeout(timeout_ms);
@@ -76,10 +76,10 @@ Status ClientCacheHelper::get_client(const TNetworkAddress&
hostport, client_fac
return Status::OK();
}
-Status ClientCacheHelper::reopen_client(client_factory factory_method, void**
client_key,
+Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void**
client_key,
int timeout_ms) {
std::lock_guard<std::mutex> lock(_lock);
- ClientMap::iterator i = _client_map.find(*client_key);
+ auto i = _client_map.find(*client_key);
DCHECK(i != _client_map.end());
ThriftClientImpl* info = i->second;
const std::string ipaddress = info->ipaddress();
@@ -92,23 +92,23 @@ Status ClientCacheHelper::reopen_client(client_factory
factory_method, void** cl
// client instead.
_client_map.erase(*client_key);
delete info;
- *client_key = NULL;
+ *client_key = nullptr;
if (_metrics_enabled) {
thrift_opened_clients->increment(-1);
}
- RETURN_IF_ERROR(create_client(make_network_address(ipaddress, port),
factory_method, client_key,
- timeout_ms));
+ RETURN_IF_ERROR(_create_client(make_network_address(ipaddress, port),
factory_method,
+ client_key, timeout_ms));
_client_map[*client_key]->set_send_timeout(timeout_ms);
_client_map[*client_key]->set_recv_timeout(timeout_ms);
return Status::OK();
}
-Status ClientCacheHelper::create_client(const TNetworkAddress& hostport,
- client_factory factory_method, void**
client_key,
- int timeout_ms) {
+Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
+ ClientFactory& factory_method, void**
client_key,
+ int timeout_ms) {
std::unique_ptr<ThriftClientImpl> client_impl(factory_method(hostport,
client_key));
//VLOG_CONNECTION << "create_client(): adding new client for "
// << client_impl->ipaddress() << ":" <<
client_impl->port();
@@ -118,7 +118,7 @@ Status ClientCacheHelper::create_client(const
TNetworkAddress& hostport,
Status status = client_impl->open();
if (!status.ok()) {
- *client_key = NULL;
+ *client_key = nullptr;
return status;
}
@@ -133,13 +133,12 @@ Status ClientCacheHelper::create_client(const
TNetworkAddress& hostport,
}
void ClientCacheHelper::release_client(void** client_key) {
- DCHECK(*client_key != NULL) << "Trying to release NULL client";
+ DCHECK(*client_key != nullptr) << "Trying to release nullptr client";
std::lock_guard<std::mutex> lock(_lock);
- ClientMap::iterator client_map_entry = _client_map.find(*client_key);
+ auto client_map_entry = _client_map.find(*client_key);
DCHECK(client_map_entry != _client_map.end());
ThriftClientImpl* info = client_map_entry->second;
- ClientCacheMap::iterator j =
- _client_cache.find(make_network_address(info->ipaddress(),
info->port()));
+ auto j = _client_cache.find(make_network_address(info->ipaddress(),
info->port()));
DCHECK(j != _client_cache.end());
if (_max_cache_size_per_host >= 0 && j->second.size() >=
_max_cache_size_per_host) {
@@ -159,12 +158,12 @@ void ClientCacheHelper::release_client(void** client_key)
{
thrift_used_clients->increment(-1);
}
- *client_key = NULL;
+ *client_key = nullptr;
}
void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) {
std::lock_guard<std::mutex> lock(_lock);
- ClientCacheMap::iterator cache_entry = _client_cache.find(hostport);
+ auto cache_entry = _client_cache.find(hostport);
if (cache_entry == _client_cache.end()) {
return;
@@ -172,7 +171,7 @@ void ClientCacheHelper::close_connections(const
TNetworkAddress& hostport) {
VLOG_RPC << "Invalidating all " << cache_entry->second.size() << " clients
for: " << hostport;
for (void* client_key : cache_entry->second) {
- ClientMap::iterator client_map_entry = _client_map.find(client_key);
+ auto client_map_entry = _client_map.find(client_key);
DCHECK(client_map_entry != _client_map.end());
ThriftClientImpl* info = client_map_entry->second;
info->close();
@@ -185,12 +184,13 @@ std::string ClientCacheHelper::debug_string() {
std::stringstream out;
out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
- for (ClientCacheMap::iterator i = _client_cache.begin(); i !=
_client_cache.end(); ++i) {
- if (i != _client_cache.begin()) {
+ bool isfirst = true;
+ for (const auto& [endpoint, client_keys] : _client_cache) {
+ if (!isfirst) {
out << " ";
+ isfirst = false;
}
-
- out << i->first << ":" << i->second.size();
+ out << endpoint << ":" << client_keys.size();
}
out << "])";
@@ -201,14 +201,12 @@ void ClientCacheHelper::test_shutdown() {
std::vector<TNetworkAddress> hostports;
{
std::lock_guard<std::mutex> lock(_lock);
- for (const ClientCacheMap::value_type& i : _client_cache) {
- hostports.push_back(i.first);
+ for (const auto& [endpoint, _] : _client_cache) {
+ hostports.push_back(endpoint);
}
}
-
- for (std::vector<TNetworkAddress>::iterator it = hostports.begin(); it !=
hostports.end();
- ++it) {
- close_connections(*it);
+ for (const auto& endpoint : hostports) {
+ close_connections(endpoint);
}
}
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index 4211e1e..8ef1a58 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -59,21 +59,21 @@ public:
~ClientCacheHelper();
// Callback method which produces a client object when one cannot be
// found in the cache. Supplied by the ClientCache wrapper.
- typedef std::function<ThriftClientImpl*(const TNetworkAddress& hostport,
void** client_key)>
- client_factory;
+ using ClientFactory =
+ std::function<ThriftClientImpl*(const TNetworkAddress& hostport,
void** client_key)>;
// Return client for specific host/port in 'client'. If a client
- // is not available, the client parameter is set to NULL.
- Status get_client(const TNetworkAddress& hostport, client_factory
factory_method,
+ // is not available, the client parameter is set to nullptr.
+ Status get_client(const TNetworkAddress& hostport, ClientFactory&
factory_method,
void** client_key, int timeout_ms);
// Close and delete the underlying transport and remove the client from
_client_map.
// Return a new client connecting to the same host/port.
- // Return an error status and set client_key to NULL if a new client cannot
+ // Return an error status and set client_key to nullptr if a new client
cannot
// created.
- Status reopen_client(client_factory factory_method, void** client_key, int
timeout_ms);
+ Status reopen_client(ClientFactory& factory_method, void** client_key, int
timeout_ms);
- // Return a client to the cache, without closing it, and set *client_key
to NULL.
+ // Return a client to the cache, without closing it, and set *client_key
to nullptr.
void release_client(void** client_key);
// Close all connections to a host (e.g., in case of failure) so that on
their
@@ -101,11 +101,11 @@ private:
std::mutex _lock;
// map from (host, port) to list of client keys for that address
- typedef std::unordered_map<TNetworkAddress, std::list<void*>>
ClientCacheMap;
+ using ClientCacheMap = std::unordered_map<TNetworkAddress,
std::list<void*>>;
ClientCacheMap _client_cache;
// Map from client key back to its associated ThriftClientImpl transport
- typedef std::unordered_map<void*, ThriftClientImpl*> ClientMap;
+ using ClientMap = std::unordered_map<void*, ThriftClientImpl*>;
ClientMap _client_map;
bool _metrics_enabled;
@@ -122,8 +122,8 @@ private:
IntGauge* thrift_opened_clients;
// Create a new client for specific host/port in 'client' and put it in
_client_map
- Status create_client(const TNetworkAddress& hostport, client_factory
factory_method,
- void** client_key, int timeout_ms);
+ Status _create_client(const TNetworkAddress& hostport, ClientFactory&
factory_method,
+ void** client_key, int timeout_ms);
};
template <class T>
@@ -147,26 +147,26 @@ template <class T>
class ClientConnection {
public:
ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress&
address, Status* status)
- : _client_cache(client_cache), _client(NULL) {
+ : _client_cache(client_cache), _client(nullptr) {
*status = _client_cache->get_client(address, &_client, 0);
if (status->ok()) {
- DCHECK(_client != NULL);
+ DCHECK(_client != nullptr);
}
}
ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress&
address, int timeout_ms,
Status* status)
- : _client_cache(client_cache), _client(NULL) {
+ : _client_cache(client_cache), _client(nullptr) {
*status = _client_cache->get_client(address, &_client, timeout_ms);
if (status->ok()) {
- DCHECK(_client != NULL);
+ DCHECK(_client != nullptr);
}
}
~ClientConnection() {
- if (_client != NULL) {
+ if (_client != nullptr) {
_client_cache->release_client(&_client);
}
}
@@ -187,7 +187,7 @@ private:
template <class T>
class ClientCache {
public:
- typedef ThriftClient<T> Client;
+ using Client = ThriftClient<T>;
ClientCache() : _client_cache_helper() {
_client_factory =
@@ -229,7 +229,8 @@ private:
ClientCacheHelper _client_cache_helper;
// Function pointer, bound to make_client, which produces clients when the
cache is empty
- ClientCacheHelper::client_factory _client_factory;
+ using ClientFactory = ClientCacheHelper::ClientFactory;
+ ClientFactory _client_factory;
// Obtains a pointer to a Thrift interface object (of type T),
// backed by a live transport which is already open. Returns
@@ -242,13 +243,13 @@ private:
// Close and delete the underlying transport. Return a new client
connecting to the
// same host/port.
// Return an error status if a new connection cannot be established and
*client will be
- // NULL in that case.
+ // nullptr in that case.
Status reopen_client(T** client, int timeout_ms) {
return _client_cache_helper.reopen_client(_client_factory,
reinterpret_cast<void**>(client),
timeout_ms);
}
- // Return the client to the cache and set *client to NULL.
+ // Return the client to the cache and set *client to nullptr.
void release_client(T** client) {
return
_client_cache_helper.release_client(reinterpret_cast<void**>(client));
}
@@ -278,17 +279,17 @@ private:
// Doris backend client cache, used by a backend to send requests
// to any other backend.
class BackendServiceClient;
-typedef ClientCache<BackendServiceClient> BackendServiceClientCache;
-typedef ClientConnection<BackendServiceClient> BackendServiceConnection;
+using BackendServiceClientCache = ClientCache<BackendServiceClient>;
+using BackendServiceConnection = ClientConnection<BackendServiceClient>;
class FrontendServiceClient;
-typedef ClientCache<FrontendServiceClient> FrontendServiceClientCache;
-typedef ClientConnection<FrontendServiceClient> FrontendServiceConnection;
+using FrontendServiceClientCache = ClientCache<FrontendServiceClient>;
+using FrontendServiceConnection = ClientConnection<FrontendServiceClient>;
class TPaloBrokerServiceClient;
-typedef ClientCache<TPaloBrokerServiceClient> BrokerServiceClientCache;
-typedef ClientConnection<TPaloBrokerServiceClient> BrokerServiceConnection;
+using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>;
+using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>;
class TExtDataSourceServiceClient;
-typedef ClientCache<TExtDataSourceServiceClient>
ExtDataSourceServiceClientCache;
-typedef ClientConnection<TExtDataSourceServiceClient>
ExtDataSourceServiceConnection;
+using ExtDataSourceServiceClientCache =
ClientCache<TExtDataSourceServiceClient>;
+using ExtDataSourceServiceConnection =
ClientConnection<TExtDataSourceServiceClient>;
} // namespace doris
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index e555914..d9a8034 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -38,10 +38,6 @@
#include "util/thrift_server.h"
namespace doris {
-
-template <class InterfaceType>
-class ThriftClient;
-
// Super class for templatized thrift clients.
class ThriftClientImpl {
public:
@@ -75,10 +71,7 @@ protected:
_port(port),
_socket(new apache::thrift::transport::TSocket(ipaddress, port))
{}
-private:
- template <class InterfaceType>
- friend class ThriftClient;
-
+protected:
std::string _ipaddress;
int _port;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]