Repository: hbase Updated Branches: refs/heads/HBASE-14850 5dbc1b5db -> 5459e0d6b
HBASE-17051 [C++] implement RPC client and connection management (Xiaobing Zhou) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5459e0d6 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5459e0d6 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5459e0d6 Branch: refs/heads/HBASE-14850 Commit: 5459e0d6beab8adaf5cb32bb94d3bbbc7a90eb46 Parents: 5dbc1b5 Author: Enis Soztutar <[email protected]> Authored: Mon Dec 19 18:38:51 2016 -0800 Committer: Enis Soztutar <[email protected]> Committed: Mon Dec 19 18:38:51 2016 -0800 ---------------------------------------------------------------------- hbase-native-client/Makefile | 2 +- hbase-native-client/connection/BUCK | 3 + hbase-native-client/connection/connection-id.h | 88 ++++++++++++++ .../connection/connection-pool-test.cc | 35 +++--- .../connection/connection-pool.cc | 47 ++++---- .../connection/connection-pool.h | 50 ++++---- hbase-native-client/connection/rpc-client.cc | 119 +++++++++++++++++++ hbase-native-client/connection/rpc-client.h | 116 ++++++++++++++++++ hbase-native-client/connection/rpc-connection.h | 58 +++++++++ hbase-native-client/core/location-cache.cc | 17 ++- hbase-native-client/security/BUCK | 30 +++++ hbase-native-client/security/user.h | 39 ++++++ 12 files changed, 531 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/Makefile ---------------------------------------------------------------------- diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index be5d461..96c275e 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils +MODULES = connection core serde test-util utils security SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index f093d5a..c22cc89 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -24,8 +24,10 @@ cxx_library( "client-handler.h", "connection-factory.h", "connection-pool.h", + "connection-id.h", "pipeline.h", "request.h", + "rpc-connection.h", "response.h", "service.h", ], @@ -41,6 +43,7 @@ cxx_library( "//if:if", "//utils:utils", "//serde:serde", + "//security:security", "//third-party:folly", "//third-party:wangle", ], http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-id.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-id.h b/hbase-native-client/connection/connection-id.h new file mode 100644 index 0000000..62fe222 --- /dev/null +++ b/hbase-native-client/connection/connection-id.h @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "if/HBase.pb.h" +#include "security/user.h" + +#include <memory> +#include <utility> +#include <boost/functional/hash.hpp> + +using hbase::pb::ServerName; +using hbase::security::User; + +namespace hbase { +class ConnectionId { + public: + ConnectionId(const std::string &host, uint16_t port) + : ConnectionId(host, port, User::defaultUser(), "") {} + + ConnectionId(const std::string &host, uint16_t port, + std::shared_ptr<User> user) + : ConnectionId(host, port, user, "") {} + + ConnectionId(const std::string &host, uint16_t port, + std::shared_ptr<User> user, const std::string &service_name) + : user_(user), service_name_(service_name), host_(host), port_(port) {} + + virtual ~ConnectionId() = default; + + std::shared_ptr<User> user() const { return user_; } + std::string service_name() const { return service_name_; } + std::string host() { return host_; } + uint16_t port() { return port_; } + + private: + std::shared_ptr<User> user_; + std::string service_name_; + std::string host_; + uint16_t port_; +}; + +/* Equals function for ConnectionId */ +struct ConnectionIdEquals { + /** equals */ + bool operator()(const std::shared_ptr<ConnectionId> &lhs, + const std::shared_ptr<ConnectionId> &rhs) const { + return userEquals(lhs->user(), rhs->user()) && lhs->host() == rhs->host() && + lhs->port() == rhs->port(); + } + + private: + bool userEquals(const std::shared_ptr<User> &lhs, + const std::shared_ptr<User> &rhs) const { + return lhs == nullptr ? rhs == nullptr + : (rhs == nullptr ? false : lhs->user_name() == + rhs->user_name()); + } +}; + +/** Hash for ConnectionId. */ +struct ConnectionIdHash { + /** hash */ + std::size_t operator()(const std::shared_ptr<ConnectionId> &ci) const { + std::size_t h = 0; + boost::hash_combine(h, ci->user() == nullptr ? 0 : ci->user()->user_name()); + boost::hash_combine(h, ci->host()); + boost::hash_combine(h, ci->port()); + return h; + } +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-pool-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index e17c16c..4547b30 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -17,20 +17,22 @@ * */ +#include "connection/connection-id.h" #include "connection/connection-pool.h" - -#include <folly/Logging.h> -#include <gmock/gmock.h> - #include "connection/connection-factory.h" + #include "if/HBase.pb.h" #include "serde/server-name.h" +#include <folly/Logging.h> +#include <gmock/gmock.h> + using namespace hbase; using hbase::pb::ServerName; using ::testing::Return; using ::testing::_; +using hbase::ConnectionId; class MockConnectionFactory : public ConnectionFactory { public: @@ -75,13 +77,10 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { .WillRepeatedly(Return(mock_boot)); ConnectionPool cp{mock_cf}; - ServerName sn; - sn.set_host_name(hostname); - sn.set_port(port); - - auto result = cp.Get(sn); + auto remote_id = std::make_shared<ConnectionId>(hostname, port); + auto result = cp.GetConnection(remote_id); ASSERT_TRUE(result != nullptr); - result = cp.Get(sn); + result = cp.GetConnection(remote_id); } TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { @@ -102,13 +101,13 @@ TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { ConnectionPool cp{mock_cf}; { - auto result_one = cp.Get(folly::to<ServerName>( - hostname_one + ":" + folly::to<std::string>(port))); - auto result_two = cp.Get(folly::to<ServerName>( - hostname_two + ":" + folly::to<std::string>(port))); + auto remote_id = std::make_shared<ConnectionId>(hostname_one, port); + auto result_one = cp.GetConnection(remote_id); + auto remote_id2 = std::make_shared<ConnectionId>(hostname_two, port); + auto result_two = cp.GetConnection(remote_id2); } - auto result_one = cp.Get( - folly::to<ServerName>(hostname_one + ":" + folly::to<std::string>(port))); - auto result_two = cp.Get( - folly::to<ServerName>(hostname_two + ":" + folly::to<std::string>(port))); + auto remote_id = std::make_shared<ConnectionId>(hostname_one, port); + auto result_one = cp.GetConnection(remote_id); + auto remote_id2 = std::make_shared<ConnectionId>(hostname_two, port); + auto result_two = cp.GetConnection(remote_id2); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index c216d0b..e022f9e 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -28,7 +28,6 @@ using std::mutex; using std::unique_ptr; using std::shared_ptr; -using hbase::pb::ServerName; using hbase::ConnectionPool; using hbase::HBaseService; using folly::SharedMutexWritePriority; @@ -47,33 +46,36 @@ ConnectionPool::~ConnectionPool() { SharedMutexWritePriority::WriteHolder holder(map_mutex_); for (auto &item : connections_) { auto &con = item.second; - con->close(); + con->Close(); } connections_.clear(); clients_.clear(); } -std::shared_ptr<HBaseService> ConnectionPool::Get(const ServerName &sn) { +std::shared_ptr<RpcConnection> ConnectionPool::GetConnection( + std::shared_ptr<ConnectionId> remote_id) { // Try and get th cached connection. - auto found_ptr = GetCached(sn); + auto found_ptr = GetCachedConnection(remote_id); // If there's no connection then create it. if (found_ptr == nullptr) { - found_ptr = GetNew(sn); + found_ptr = GetNewConnection(remote_id); } return found_ptr; } -std::shared_ptr<HBaseService> ConnectionPool::GetCached(const ServerName &sn) { +std::shared_ptr<RpcConnection> ConnectionPool::GetCachedConnection( + std::shared_ptr<ConnectionId> remote_id) { SharedMutexWritePriority::ReadHolder holder(map_mutex_); - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found == connections_.end()) { return nullptr; } return found->second; } -std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName &sn) { +std::shared_ptr<RpcConnection> ConnectionPool::GetNewConnection( + std::shared_ptr<ConnectionId> remote_id) { // Grab the upgrade lock. While we are double checking other readers can // continue on SharedMutexWritePriority::UpgradeHolder u_holder{map_mutex_}; @@ -81,7 +83,7 @@ std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName &sn) { // Now check if someone else created the connection before we got the lock // This is safe since we hold the upgrade lock. // upgrade lock is more power than the reader lock. - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found != connections_.end() && found->second != nullptr) { return found->second; } else { @@ -89,24 +91,29 @@ std::shared_ptr<HBaseService> ConnectionPool::GetNew(const ServerName &sn) { SharedMutexWritePriority::WriteHolder w_holder{std::move(u_holder)}; // Make double sure there are not stale connections hanging around. - connections_.erase(sn); - - // Nope we are the ones who should create the new connection. - auto client = cf_->MakeBootstrap(); - auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port()); - clients_.insert(std::make_pair(sn, client)); - connections_.insert(std::make_pair(sn, dispatcher)); - return dispatcher; + connections_.erase(remote_id); + + /* create new connection */ + auto clientBootstrap = cf_->MakeBootstrap(); + auto dispatcher = + cf_->Connect(clientBootstrap, remote_id->host(), remote_id->port()); + + auto conneciton = std::make_shared<RpcConnection>(remote_id, dispatcher); + + connections_.insert(std::make_pair(remote_id, conneciton)); + clients_.insert(std::make_pair(remote_id, clientBootstrap)); + + return conneciton; } } -void ConnectionPool::Close(const ServerName &sn) { +void ConnectionPool::Close(std::shared_ptr<ConnectionId> remote_id) { SharedMutexWritePriority::WriteHolder holder{map_mutex_}; - auto found = connections_.find(sn); + auto found = connections_.find(remote_id); if (found == connections_.end() || found->second == nullptr) { return; } - auto service = found->second; + found->second->Close(); connections_.erase(found); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 2624336..96d89ac 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -25,38 +25,24 @@ #include <unordered_map> #include "connection/connection-factory.h" +#include "connection/connection-id.h" +#include "connection/rpc-connection.h" #include "connection/service.h" #include "if/HBase.pb.h" -namespace hbase { -/** Equals function for server name that ignores start time */ -struct ServerNameEquals { - /** equals */ - bool operator()(const hbase::pb::ServerName &lhs, - const hbase::pb::ServerName &rhs) const { - return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port(); - } -}; +using hbase::ConnectionId; +using hbase::ConnectionIdEquals; +using hbase::ConnectionIdHash; +using hbase::RpcConnection; -/** Hash for ServerName that ignores the start time. */ -struct ServerNameHash { - /** hash */ - std::size_t operator()(hbase::pb::ServerName const &s) const { - std::size_t h = 0; - boost::hash_combine(h, s.host_name()); - boost::hash_combine(h, s.port()); - return h; - } -}; +namespace hbase { /** * @brief Connection pooling for HBase rpc connection. * * This is a thread safe connection pool. It allows getting - * a shared connection to HBase by server name. This is - * useful for keeping a single connection no matter how many regions a - * regionserver has on it. + * a shared rpc connection to HBase servers by connection id. */ class ConnectionPool { public: @@ -81,23 +67,27 @@ class ConnectionPool { * Get a connection to the server name. Start time is ignored. * This can be a blocking operation for a short time. */ - std::shared_ptr<HBaseService> Get(const hbase::pb::ServerName &sn); + std::shared_ptr<RpcConnection> GetConnection( + std::shared_ptr<ConnectionId> remote_id); /** * Close/remove a connection. */ - void Close(const hbase::pb::ServerName &sn); + void Close(std::shared_ptr<ConnectionId> remote_id); private: - std::shared_ptr<HBaseService> GetCached(const hbase::pb::ServerName &sn); - std::shared_ptr<HBaseService> GetNew(const hbase::pb::ServerName &sn); - std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>, - ServerNameHash, ServerNameEquals> + std::shared_ptr<RpcConnection> GetCachedConnection( + std::shared_ptr<ConnectionId> remote_id); + std::shared_ptr<RpcConnection> GetNewConnection( + std::shared_ptr<ConnectionId> remote_id); + std::unordered_map<std::shared_ptr<ConnectionId>, + std::shared_ptr<RpcConnection>, ConnectionIdHash, + ConnectionIdEquals> connections_; std::unordered_map< - hbase::pb::ServerName, + std::shared_ptr<ConnectionId>, std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>, - ServerNameHash, ServerNameEquals> + ConnectionIdHash, ConnectionIdEquals> clients_; folly::SharedMutexWritePriority map_mutex_; std::shared_ptr<ConnectionFactory> cf_; http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/rpc-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc new file mode 100644 index 0000000..66ec231 --- /dev/null +++ b/hbase-native-client/connection/rpc-client.cc @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/rpc-client.h" +#include <unistd.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +using hbase::RpcClient; +using hbase::AbstractRpcChannel; + +namespace hbase { + +class RpcChannelImplementation : public AbstractRpcChannel { + public: + RpcChannelImplementation(std::shared_ptr<RpcClient> rpc_client, + const std::string& host, uint16_t port, + std::shared_ptr<User> ticket, int rpc_timeout) + : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {} + + void CallMethod(const MethodDescriptor* method, RpcController* controller, + const Message* request, Message* response, + Closure* done) override { + rpc_client_->CallMethod(method, controller, request, response, done, host_, + port_, ticket_); + } +}; +} // namespace hbase + +RpcClient::RpcClient() { + auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>( + sysconf(_SC_NPROCESSORS_ONLN)); + + cp_ = std::make_shared<ConnectionPool>(io_executor); +} + +void RpcClient::Close() {} + +std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket) { + return std::make_shared<Response>( + AsyncCall(host, port, std::move(req), ticket).get()); +} + +std::shared_ptr<Response> RpcClient::SyncCall(const std::string& host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket, + const std::string& service_name) { + return std::make_shared<Response>( + AsyncCall(host, port, std::move(req), ticket, service_name).get()); +} + +folly::Future<Response> RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket) { + auto remote_id = std::make_shared<ConnectionId>(host, port, ticket); + return GetConnection(remote_id)->SendRequest(std::move(req)); +} + +folly::Future<Response> RpcClient::AsyncCall(const std::string& host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket, + const std::string& service_name) { + auto remote_id = + std::make_shared<ConnectionId>(host, port, ticket, service_name); + return GetConnection(remote_id)->SendRequest(std::move(req)); +} + +std::shared_ptr<RpcConnection> RpcClient::GetConnection( + std::shared_ptr<ConnectionId> remote_id) { + return cp_->GetConnection(remote_id); +} + +std::shared_ptr<RpcChannel> RpcClient::CreateRpcChannel( + const std::string& host, uint16_t port, std::shared_ptr<User> ticket, + int rpc_timeout) { + std::shared_ptr<RpcChannelImplementation> channel = + std::make_shared<RpcChannelImplementation>(shared_from_this(), host, port, + ticket, rpc_timeout); + + /* static_pointer_cast is safe since RpcChannelImplementation derives + * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */ + return std::static_pointer_cast<RpcChannel>(channel); +} + +void RpcClient::CallMethod(const MethodDescriptor* method, + RpcController* controller, const Message* req_msg, + Message* resp_msg, Closure* done, + const std::string& host, uint16_t port, + std::shared_ptr<User> ticket) { + std::shared_ptr<Message> shared_req(const_cast<Message*>(req_msg)); + std::shared_ptr<Message> shared_resp(resp_msg); + + std::unique_ptr<Request> req = + std::make_unique<Request>(shared_req, shared_resp, method->name()); + + AsyncCall(host, port, std::move(req), ticket) + .then([done, this](Response resp) { done->Run(); }); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/rpc-client.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h new file mode 100644 index 0000000..c24db9d --- /dev/null +++ b/hbase-native-client/connection/rpc-client.h @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "connection/connection-id.h" +#include "connection/connection-pool.h" +#include "connection/request.h" +#include "connection/response.h" +#include "security/user.h" + +#include <google/protobuf/service.h> + +using hbase::security::User; +using hbase::pb::ServerName; +using hbase::Request; +using hbase::Response; +using hbase::ConnectionId; +using hbase::ConnectionPool; +using hbase::RpcConnection; +using hbase::security::User; + +using google::protobuf::MethodDescriptor; +using google::protobuf::RpcChannel; +using google::protobuf::Message; +using google::protobuf::RpcController; +using google::protobuf::Closure; + +class RpcChannelImplementation; + +namespace hbase { + +class RpcClient : public std::enable_shared_from_this<RpcClient> { + friend class RpcChannelImplementation; + + public: + RpcClient(); + + virtual ~RpcClient() { Close(); } + + virtual std::shared_ptr<Response> SyncCall(const std::string &host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket); + + virtual std::shared_ptr<Response> SyncCall(const std::string &host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket, + const std::string &service_name); + + virtual folly::Future<Response> AsyncCall(const std::string &host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket); + + virtual folly::Future<Response> AsyncCall(const std::string &host, + uint16_t port, + std::unique_ptr<Request> req, + std::shared_ptr<User> ticket, + const std::string &service_name); + + virtual void Close(); + + virtual std::shared_ptr<RpcChannel> CreateRpcChannel( + const std::string &host, uint16_t port, std::shared_ptr<User> ticket, + int rpc_timeout); + + private: + void CallMethod(const MethodDescriptor *method, RpcController *controller, + const Message *req_msg, Message *resp_msg, Closure *done, + const std::string &host, uint16_t port, + std::shared_ptr<User> ticket); + std::shared_ptr<RpcConnection> GetConnection( + std::shared_ptr<ConnectionId> remote_id); + + private: + std::shared_ptr<ConnectionPool> cp_; +}; + +class AbstractRpcChannel : public RpcChannel { + public: + AbstractRpcChannel(std::shared_ptr<RpcClient> rpc_client, + const std::string &host, uint16_t port, + std::shared_ptr<User> ticket, int rpc_timeout) + : rpc_client_(rpc_client), + host_(host), + port_(port), + ticket_(ticket), + rpc_timeout_(rpc_timeout) {} + + virtual ~AbstractRpcChannel() = default; + + protected: + std::shared_ptr<RpcClient> rpc_client_; + std::string host_; + uint16_t port_; + std::shared_ptr<User> ticket_; + int rpc_timeout_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/connection/rpc-connection.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-connection.h b/hbase-native-client/connection/rpc-connection.h new file mode 100644 index 0000000..2e06ec3 --- /dev/null +++ b/hbase-native-client/connection/rpc-connection.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include "connection/connection-id.h" +#include "connection/request.h" +#include "connection/response.h" +#include "connection/service.h" + +#include <memory> +#include <utility> + +using hbase::HBaseService; + +namespace hbase { +class RpcConnection { + public: + RpcConnection(std::shared_ptr<ConnectionId> connection_id, + std::shared_ptr<HBaseService> hbase_service) + : connection_id_(connection_id), hbase_service_(hbase_service) {} + + virtual ~RpcConnection() { Close(); } + + virtual std::shared_ptr<ConnectionId> remote_id() const { + return connection_id_; + } + + virtual std::shared_ptr<HBaseService> get_service() const { + return hbase_service_; + } + + virtual folly::Future<Response> SendRequest(std::unique_ptr<Request> req) { + return (*hbase_service_)(std::move(req)); + } + + virtual void Close() { hbase_service_->close(); } + + private: + std::shared_ptr<ConnectionId> connection_id_; + std::shared_ptr<HBaseService> hbase_service_; +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index a0ca5ca..4c29a61 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -26,6 +26,7 @@ #include <utility> #include "connection/response.h" +#include "connection/rpc-connection.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" #include "serde/region-info.h" @@ -34,6 +35,7 @@ using namespace std; using namespace folly; +using hbase::RpcConnection; using wangle::ServiceFilter; using hbase::Request; @@ -121,9 +123,14 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta( const TableName &tn, const string &row) { return this->LocateMeta() .via(cpu_executor_.get()) - .then([this](ServerName sn) { return this->cp_.Get(sn); }) - .then([tn, row, this](std::shared_ptr<HBaseService> service) { - return (*service)(std::move(meta_util_.MetaRequest(tn, row))); + .then([this](ServerName sn) { + auto remote_id = + std::make_shared<ConnectionId>(sn.host_name(), sn.port()); + return this->cp_.GetConnection(remote_id); + }) + .then([tn, row, this](std::shared_ptr<RpcConnection> rpc_connection) { + return (*rpc_connection->get_service())( + std::move(meta_util_.MetaRequest(tn, row))); }) .then([this](Response resp) { // take the protobuf response and make it into @@ -139,8 +146,10 @@ Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta( return rl; }) .then([this](std::shared_ptr<RegionLocation> rl) { + auto remote_id = std::make_shared<ConnectionId>( + rl->server_name().host_name(), rl->server_name().port()); // Now fill out the connection. - rl->set_service(cp_.Get(rl->server_name())); + rl->set_service(cp_.GetConnection(remote_id)->get_service()); return rl; }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/security/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/security/BUCK b/hbase-native-client/security/BUCK new file mode 100644 index 0000000..5b935d3 --- /dev/null +++ b/hbase-native-client/security/BUCK @@ -0,0 +1,30 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the library dealing with a single connection +# to a single server. +cxx_library( + name="security", + exported_headers=[ + "user.h", + ], + srcs=[ + ], + deps=[ + ], + compiler_flags=['-Weffc++'], + visibility=['//core/...','//connection/...'],) http://git-wip-us.apache.org/repos/asf/hbase/blob/5459e0d6/hbase-native-client/security/user.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/security/user.h b/hbase-native-client/security/user.h new file mode 100644 index 0000000..795f5ac --- /dev/null +++ b/hbase-native-client/security/user.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <string> + +namespace hbase { +namespace security { +class User { +public: + explicit User(const std::string& user_name) : user_name_(user_name) {} + virtual ~User() = default; + + std::string user_name() {return user_name_;} + + static std::shared_ptr<User> defaultUser() { + return std::make_shared<User>("__drwho"); + } +private: + std::string user_name_; +}; +} +}
