This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-14850 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit a2cc4548fcb44ffb8150249716ce28637a642aab Author: Enis Soztutar <[email protected]> AuthorDate: Fri Jan 13 11:29:00 2017 -0800 HBASE-17463 [C++] RpcClient should close the thread pool --- hbase-native-client/connection/connection-pool.cc | 3 +++ hbase-native-client/connection/connection-pool.h | 5 +++++ hbase-native-client/connection/rpc-client.cc | 14 +++++++++----- hbase-native-client/connection/rpc-client.h | 1 + hbase-native-client/serde/rpc.cc | 1 + 5 files changed, 19 insertions(+), 5 deletions(-) diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index e022f9e..ee14c9d 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -117,3 +117,6 @@ void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) { found->second->Close(); connections_.erase(found); } + +void ConnectionPool::Close() { +} diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 96d89ac..5101c68 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -75,6 +75,11 @@ class ConnectionPool { */ void Close(std::shared_ptr<ConnectionId> remote_id); + /** + * Close the Connection Pool + */ + void Close(); + private: std::shared_ptr<RpcConnection> GetCachedConnection( std::shared_ptr<ConnectionId> remote_id); diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index 66ec231..3f0cfaf 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -43,13 +43,15 @@ class RpcChannelImplementation : public AbstractRpcChannel { } // namespace hbase RpcClient::RpcClient() { - auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>( + io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>( sysconf(_SC_NPROCESSORS_ONLN)); - cp_ = std::make_shared<ConnectionPool>(io_executor); + cp_ = std::make_shared<ConnectionPool>(io_executor_); } -void RpcClient::Close() {} +void RpcClient::Close() { + io_executor_->stop(); +} std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, uint16_t port, @@ -114,6 +116,8 @@ void RpcClient::CallMethod(const MethodDescriptor* method, std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name()); - AsyncCall(host, port, std::move(req), ticket) - .then([done, this](Response resp) { done->Run(); }); + AsyncCall(host, port, std::move(req), ticket, method->service()->name()) + .then([done, this](Response resp) { + done->Run(); + }); } diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index c24db9d..dbf857d 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -91,6 +91,7 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> { private: std::shared_ptr<ConnectionPool> cp_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; }; class AbstractRpcChannel : public RpcChannel { diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc index d863d50..e4ade22 100644 --- a/hbase-native-client/serde/rpc.cc +++ b/hbase-native-client/serde/rpc.cc @@ -110,6 +110,7 @@ unique_ptr<IOBuf> RpcSerde::Header(const string &user) { // That may or may not be the correct thing to do. // It worked for a while with the java client; until it // didn't. + // TODO: send the service name and user from the RpcClient h.set_service_name(INTERFACE); return PrependLength(SerializeMessage(h)); }
