This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 19cd42c  [BUG] avoid std::function copy in client cache (#6186)
19cd42c is described below

commit 19cd42ccbd08809fa74105277e4963c97113b8bc
Author: stdpain <[email protected]>
AuthorDate: Fri Jul 16 09:20:28 2021 +0800

    [BUG] avoid std::function copy in client cache (#6186)
    
    * [BUG] avoid std::function copy in client cache
    
    * Refactor ClientFactory Name
---
 be/src/runtime/client_cache.cpp | 56 +++++++++++++++++++---------------------
 be/src/runtime/client_cache.h   | 57 +++++++++++++++++++++--------------------
 be/src/util/thrift_client.h     |  9 +------
 3 files changed, 57 insertions(+), 65 deletions(-)

diff --git a/be/src/runtime/client_cache.cpp b/be/src/runtime/client_cache.cpp
index a46ef86..1d823ed 100644
--- a/be/src/runtime/client_cache.cpp
+++ b/be/src/runtime/client_cache.cpp
@@ -45,11 +45,11 @@ ClientCacheHelper::~ClientCacheHelper() {
     }
 }
 
-Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, 
client_factory factory_method,
+Status ClientCacheHelper::get_client(const TNetworkAddress& hostport, 
ClientFactory& factory_method,
                                      void** client_key, int timeout_ms) {
     std::lock_guard<std::mutex> lock(_lock);
     //VLOG_RPC << "get_client(" << hostport << ")";
-    ClientCacheMap::iterator cache_entry = _client_cache.find(hostport);
+    auto cache_entry = _client_cache.find(hostport);
 
     if (cache_entry == _client_cache.end()) {
         cache_entry = _client_cache.insert(std::make_pair(hostport, 
std::list<void*>())).first;
@@ -63,7 +63,7 @@ Status ClientCacheHelper::get_client(const TNetworkAddress& 
hostport, client_fac
         VLOG_RPC << "get_client(): cached client for " << hostport;
         info_list.pop_front();
     } else {
-        RETURN_IF_ERROR(create_client(hostport, factory_method, client_key, 
timeout_ms));
+        RETURN_IF_ERROR(_create_client(hostport, factory_method, client_key, 
timeout_ms));
     }
 
     _client_map[*client_key]->set_send_timeout(timeout_ms);
@@ -76,10 +76,10 @@ Status ClientCacheHelper::get_client(const TNetworkAddress& 
hostport, client_fac
     return Status::OK();
 }
 
-Status ClientCacheHelper::reopen_client(client_factory factory_method, void** 
client_key,
+Status ClientCacheHelper::reopen_client(ClientFactory& factory_method, void** 
client_key,
                                         int timeout_ms) {
     std::lock_guard<std::mutex> lock(_lock);
-    ClientMap::iterator i = _client_map.find(*client_key);
+    auto i = _client_map.find(*client_key);
     DCHECK(i != _client_map.end());
     ThriftClientImpl* info = i->second;
     const std::string ipaddress = info->ipaddress();
@@ -92,23 +92,23 @@ Status ClientCacheHelper::reopen_client(client_factory 
factory_method, void** cl
     // client instead.
     _client_map.erase(*client_key);
     delete info;
-    *client_key = NULL;
+    *client_key = nullptr;
 
     if (_metrics_enabled) {
         thrift_opened_clients->increment(-1);
     }
 
-    RETURN_IF_ERROR(create_client(make_network_address(ipaddress, port), 
factory_method, client_key,
-                                  timeout_ms));
+    RETURN_IF_ERROR(_create_client(make_network_address(ipaddress, port), 
factory_method,
+                                   client_key, timeout_ms));
 
     _client_map[*client_key]->set_send_timeout(timeout_ms);
     _client_map[*client_key]->set_recv_timeout(timeout_ms);
     return Status::OK();
 }
 
-Status ClientCacheHelper::create_client(const TNetworkAddress& hostport,
-                                        client_factory factory_method, void** 
client_key,
-                                        int timeout_ms) {
+Status ClientCacheHelper::_create_client(const TNetworkAddress& hostport,
+                                         ClientFactory& factory_method, void** 
client_key,
+                                         int timeout_ms) {
     std::unique_ptr<ThriftClientImpl> client_impl(factory_method(hostport, 
client_key));
     //VLOG_CONNECTION << "create_client(): adding new client for "
     //                << client_impl->ipaddress() << ":" << 
client_impl->port();
@@ -118,7 +118,7 @@ Status ClientCacheHelper::create_client(const 
TNetworkAddress& hostport,
     Status status = client_impl->open();
 
     if (!status.ok()) {
-        *client_key = NULL;
+        *client_key = nullptr;
         return status;
     }
 
@@ -133,13 +133,12 @@ Status ClientCacheHelper::create_client(const 
TNetworkAddress& hostport,
 }
 
 void ClientCacheHelper::release_client(void** client_key) {
-    DCHECK(*client_key != NULL) << "Trying to release NULL client";
+    DCHECK(*client_key != nullptr) << "Trying to release nullptr client";
     std::lock_guard<std::mutex> lock(_lock);
-    ClientMap::iterator client_map_entry = _client_map.find(*client_key);
+    auto client_map_entry = _client_map.find(*client_key);
     DCHECK(client_map_entry != _client_map.end());
     ThriftClientImpl* info = client_map_entry->second;
-    ClientCacheMap::iterator j =
-            _client_cache.find(make_network_address(info->ipaddress(), 
info->port()));
+    auto j = _client_cache.find(make_network_address(info->ipaddress(), 
info->port()));
     DCHECK(j != _client_cache.end());
 
     if (_max_cache_size_per_host >= 0 && j->second.size() >= 
_max_cache_size_per_host) {
@@ -159,12 +158,12 @@ void ClientCacheHelper::release_client(void** client_key) 
{
         thrift_used_clients->increment(-1);
     }
 
-    *client_key = NULL;
+    *client_key = nullptr;
 }
 
 void ClientCacheHelper::close_connections(const TNetworkAddress& hostport) {
     std::lock_guard<std::mutex> lock(_lock);
-    ClientCacheMap::iterator cache_entry = _client_cache.find(hostport);
+    auto cache_entry = _client_cache.find(hostport);
 
     if (cache_entry == _client_cache.end()) {
         return;
@@ -172,7 +171,7 @@ void ClientCacheHelper::close_connections(const 
TNetworkAddress& hostport) {
 
     VLOG_RPC << "Invalidating all " << cache_entry->second.size() << " clients 
for: " << hostport;
     for (void* client_key : cache_entry->second) {
-        ClientMap::iterator client_map_entry = _client_map.find(client_key);
+        auto client_map_entry = _client_map.find(client_key);
         DCHECK(client_map_entry != _client_map.end());
         ThriftClientImpl* info = client_map_entry->second;
         info->close();
@@ -185,12 +184,13 @@ std::string ClientCacheHelper::debug_string() {
     std::stringstream out;
     out << "ClientCacheHelper(#hosts=" << _client_cache.size() << " [";
 
-    for (ClientCacheMap::iterator i = _client_cache.begin(); i != 
_client_cache.end(); ++i) {
-        if (i != _client_cache.begin()) {
+    bool isfirst = true;
+    for (const auto& [endpoint, client_keys] : _client_cache) {
+        if (!isfirst) {
             out << " ";
+            isfirst = false;
         }
-
-        out << i->first << ":" << i->second.size();
+        out << endpoint << ":" << client_keys.size();
     }
 
     out << "])";
@@ -201,14 +201,12 @@ void ClientCacheHelper::test_shutdown() {
     std::vector<TNetworkAddress> hostports;
     {
         std::lock_guard<std::mutex> lock(_lock);
-        for (const ClientCacheMap::value_type& i : _client_cache) {
-            hostports.push_back(i.first);
+        for (const auto& [endpoint, _] : _client_cache) {
+            hostports.push_back(endpoint);
         }
     }
-
-    for (std::vector<TNetworkAddress>::iterator it = hostports.begin(); it != 
hostports.end();
-         ++it) {
-        close_connections(*it);
+    for (const auto& endpoint : hostports) {
+        close_connections(endpoint);
     }
 }
 
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index 4211e1e..8ef1a58 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -59,21 +59,21 @@ public:
     ~ClientCacheHelper();
     // Callback method which produces a client object when one cannot be
     // found in the cache. Supplied by the ClientCache wrapper.
-    typedef std::function<ThriftClientImpl*(const TNetworkAddress& hostport, 
void** client_key)>
-            client_factory;
+    using ClientFactory =
+            std::function<ThriftClientImpl*(const TNetworkAddress& hostport, 
void** client_key)>;
 
     // Return client for specific host/port in 'client'. If a client
-    // is not available, the client parameter is set to NULL.
-    Status get_client(const TNetworkAddress& hostport, client_factory 
factory_method,
+    // is not available, the client parameter is set to nullptr.
+    Status get_client(const TNetworkAddress& hostport, ClientFactory& 
factory_method,
                       void** client_key, int timeout_ms);
 
     // Close and delete the underlying transport and remove the client from 
_client_map.
     // Return a new client connecting to the same host/port.
-    // Return an error status and set client_key to NULL if a new client cannot
+    // Return an error status and set client_key to nullptr if a new client 
cannot
     // created.
-    Status reopen_client(client_factory factory_method, void** client_key, int 
timeout_ms);
+    Status reopen_client(ClientFactory& factory_method, void** client_key, int 
timeout_ms);
 
-    // Return a client to the cache, without closing it, and set *client_key 
to NULL.
+    // Return a client to the cache, without closing it, and set *client_key 
to nullptr.
     void release_client(void** client_key);
 
     // Close all connections to a host (e.g., in case of failure) so that on 
their
@@ -101,11 +101,11 @@ private:
     std::mutex _lock;
 
     // map from (host, port) to list of client keys for that address
-    typedef std::unordered_map<TNetworkAddress, std::list<void*>> 
ClientCacheMap;
+    using ClientCacheMap = std::unordered_map<TNetworkAddress, 
std::list<void*>>;
     ClientCacheMap _client_cache;
 
     // Map from client key back to its associated ThriftClientImpl transport
-    typedef std::unordered_map<void*, ThriftClientImpl*> ClientMap;
+    using ClientMap = std::unordered_map<void*, ThriftClientImpl*>;
     ClientMap _client_map;
 
     bool _metrics_enabled;
@@ -122,8 +122,8 @@ private:
     IntGauge* thrift_opened_clients;
 
     // Create a new client for specific host/port in 'client' and put it in 
_client_map
-    Status create_client(const TNetworkAddress& hostport, client_factory 
factory_method,
-                         void** client_key, int timeout_ms);
+    Status _create_client(const TNetworkAddress& hostport, ClientFactory& 
factory_method,
+                          void** client_key, int timeout_ms);
 };
 
 template <class T>
@@ -147,26 +147,26 @@ template <class T>
 class ClientConnection {
 public:
     ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& 
address, Status* status)
-            : _client_cache(client_cache), _client(NULL) {
+            : _client_cache(client_cache), _client(nullptr) {
         *status = _client_cache->get_client(address, &_client, 0);
 
         if (status->ok()) {
-            DCHECK(_client != NULL);
+            DCHECK(_client != nullptr);
         }
     }
 
     ClientConnection(ClientCache<T>* client_cache, const TNetworkAddress& 
address, int timeout_ms,
                      Status* status)
-            : _client_cache(client_cache), _client(NULL) {
+            : _client_cache(client_cache), _client(nullptr) {
         *status = _client_cache->get_client(address, &_client, timeout_ms);
 
         if (status->ok()) {
-            DCHECK(_client != NULL);
+            DCHECK(_client != nullptr);
         }
     }
 
     ~ClientConnection() {
-        if (_client != NULL) {
+        if (_client != nullptr) {
             _client_cache->release_client(&_client);
         }
     }
@@ -187,7 +187,7 @@ private:
 template <class T>
 class ClientCache {
 public:
-    typedef ThriftClient<T> Client;
+    using Client = ThriftClient<T>;
 
     ClientCache() : _client_cache_helper() {
         _client_factory =
@@ -229,7 +229,8 @@ private:
     ClientCacheHelper _client_cache_helper;
 
     // Function pointer, bound to make_client, which produces clients when the 
cache is empty
-    ClientCacheHelper::client_factory _client_factory;
+    using ClientFactory = ClientCacheHelper::ClientFactory;
+    ClientFactory _client_factory;
 
     // Obtains a pointer to a Thrift interface object (of type T),
     // backed by a live transport which is already open. Returns
@@ -242,13 +243,13 @@ private:
     // Close and delete the underlying transport. Return a new client 
connecting to the
     // same host/port.
     // Return an error status if a new connection cannot be established and 
*client will be
-    // NULL in that case.
+    // nullptr in that case.
     Status reopen_client(T** client, int timeout_ms) {
         return _client_cache_helper.reopen_client(_client_factory, 
reinterpret_cast<void**>(client),
                                                   timeout_ms);
     }
 
-    // Return the client to the cache and set *client to NULL.
+    // Return the client to the cache and set *client to nullptr.
     void release_client(T** client) {
         return 
_client_cache_helper.release_client(reinterpret_cast<void**>(client));
     }
@@ -278,17 +279,17 @@ private:
 // Doris backend client cache, used by a backend to send requests
 // to any other backend.
 class BackendServiceClient;
-typedef ClientCache<BackendServiceClient> BackendServiceClientCache;
-typedef ClientConnection<BackendServiceClient> BackendServiceConnection;
+using BackendServiceClientCache = ClientCache<BackendServiceClient>;
+using BackendServiceConnection = ClientConnection<BackendServiceClient>;
 class FrontendServiceClient;
-typedef ClientCache<FrontendServiceClient> FrontendServiceClientCache;
-typedef ClientConnection<FrontendServiceClient> FrontendServiceConnection;
+using FrontendServiceClientCache = ClientCache<FrontendServiceClient>;
+using FrontendServiceConnection = ClientConnection<FrontendServiceClient>;
 class TPaloBrokerServiceClient;
-typedef ClientCache<TPaloBrokerServiceClient> BrokerServiceClientCache;
-typedef ClientConnection<TPaloBrokerServiceClient> BrokerServiceConnection;
+using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>;
+using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>;
 class TExtDataSourceServiceClient;
-typedef ClientCache<TExtDataSourceServiceClient> 
ExtDataSourceServiceClientCache;
-typedef ClientConnection<TExtDataSourceServiceClient> 
ExtDataSourceServiceConnection;
+using ExtDataSourceServiceClientCache = 
ClientCache<TExtDataSourceServiceClient>;
+using ExtDataSourceServiceConnection = 
ClientConnection<TExtDataSourceServiceClient>;
 
 } // namespace doris
 
diff --git a/be/src/util/thrift_client.h b/be/src/util/thrift_client.h
index e555914..d9a8034 100644
--- a/be/src/util/thrift_client.h
+++ b/be/src/util/thrift_client.h
@@ -38,10 +38,6 @@
 #include "util/thrift_server.h"
 
 namespace doris {
-
-template <class InterfaceType>
-class ThriftClient;
-
 // Super class for templatized thrift clients.
 class ThriftClientImpl {
 public:
@@ -75,10 +71,7 @@ protected:
               _port(port),
               _socket(new apache::thrift::transport::TSocket(ipaddress, port)) 
{}
 
-private:
-    template <class InterfaceType>
-    friend class ThriftClient;
-
+protected:
     std::string _ipaddress;
     int _port;
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to