HBASE-15766 Show working puts Summary: Add on showing how a set of puts to a single connection will work. This still needs retries and looking up what region each request is going to
Test Plan: ./buck-out/gen/core/simple-client -columns 100 ../bin/hbase shell count 't' 100 row(s) in 0.2470 seconds Differential Revision: https://reviews.facebook.net/D57603 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/79b5085d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/79b5085d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/79b5085d Branch: refs/heads/HBASE-14850 Commit: 79b5085db5ca6283714ec5be2f5439d4d5d5eb72 Parents: f034294 Author: Elliott Clark <ecl...@apache.org> Authored: Wed May 4 01:54:21 2016 -0700 Committer: Elliott Clark <ecl...@apache.org> Committed: Mon Jul 11 16:47:26 2016 -0700 ---------------------------------------------------------------------- .../connection/client-dispatcher.cc | 9 +-- .../connection/client-dispatcher.h | 11 ++- .../connection/client-handler.cc | 13 ++-- hbase-native-client/connection/client-handler.h | 4 +- .../connection/connection-factory.cc | 46 ++++++------ .../connection/connection-factory.h | 11 ++- .../connection/connection-pool-test.cc | 54 +++++++++++--- .../connection/connection-pool.cc | 33 +++++++-- .../connection/connection-pool.h | 10 ++- hbase-native-client/core/client.cc | 6 +- hbase-native-client/core/location-cache.cc | 25 +++---- hbase-native-client/core/location-cache.h | 6 +- hbase-native-client/core/meta-utils.cc | 6 +- hbase-native-client/core/meta-utils.h | 4 +- hbase-native-client/core/region-location.h | 8 ++- hbase-native-client/core/simple-client.cc | 76 +++++++++++++++++--- .../serde/region-info-deserializer-test.cc | 1 - hbase-native-client/serde/region-info.h | 6 +- 18 files changed, 231 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-dispatcher.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 6e2dc54..655d765 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -22,10 +22,11 @@ using namespace folly; using namespace hbase; using namespace wangle; -ClientDispatcher::ClientDispatcher() : requests_(), current_call_id_(9) {} +ClientDispatcher::ClientDispatcher() : requests_(5000), current_call_id_(9) {} void ClientDispatcher::read(Context *ctx, Response in) { auto call_id = in.call_id(); + auto search = requests_.find(call_id); CHECK(search != requests_.end()); auto p = std::move(search->second); @@ -38,10 +39,10 @@ void ClientDispatcher::read(Context *ctx, Response in) { } Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) { - auto call_id = ++current_call_id_; - + auto call_id = current_call_id_++; arg->set_call_id(call_id); - auto &p = requests_[call_id]; + requests_.insert(call_id, Promise<Response>{}); + auto &p = requests_.find(call_id)->second; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { LOG(ERROR) << "e = " << call_id; http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-dispatcher.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-dispatcher.h b/hbase-native-client/connection/client-dispatcher.h index 826fc6a..4435a1b 100644 --- a/hbase-native-client/connection/client-dispatcher.h +++ b/hbase-native-client/connection/client-dispatcher.h @@ -19,8 +19,12 @@ #pragma once +#include <folly/AtomicHashMap.h> +#include <folly/Logging.h> #include <wangle/service/ClientDispatcher.h> +#include <atomic> + #include "connection/pipeline.h" #include "connection/request.h" #include "connection/response.h" @@ -31,13 +35,16 @@ class ClientDispatcher std::unique_ptr<Request>, Response> { public: ClientDispatcher(); + ~ClientDispatcher() { + LOG(ERROR) << "Killing ClientDispatcher call_id = " << current_call_id_; + } void read(Context *ctx, Response in) override; folly::Future<Response> operator()(std::unique_ptr<Request> arg) override; folly::Future<folly::Unit> close(Context *ctx) override; folly::Future<folly::Unit> close() override; private: - std::unordered_map<uint32_t, folly::Promise<Response>> requests_; + folly::AtomicHashMap<uint32_t, folly::Promise<Response>> requests_; // Start at some number way above what could // be there for un-initialized call id counters. // @@ -46,6 +53,6 @@ private: // // uint32_t has a max of 4Billion so 10 more or less is // not a big deal. - uint32_t current_call_id_; + std::atomic<uint32_t> current_call_id_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 496e4f2..b92ad89 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,7 +37,10 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {} + : user_name_(user_name), need_send_header_(true), serde_(), + resp_msgs_( + make_unique<folly::AtomicHashMap< + uint32_t, std::shared_ptr<google::protobuf::Message>>>(5000)) {} void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { if (LIKELY(buf != nullptr)) { @@ -51,14 +54,14 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { << " has_exception=" << header.has_exception(); // Get the response protobuf from the map - auto search = resp_msgs_.find(header.call_id()); + auto search = resp_msgs_->find(header.call_id()); // It's an error if it's not there. - CHECK(search != resp_msgs_.end()); + CHECK(search != resp_msgs_->end()); auto resp_msg = search->second; CHECK(resp_msg != nullptr); // Make sure we don't leak the protobuf - resp_msgs_.erase(search); + resp_msgs_->erase(header.call_id()); // set the call_id. // This will be used to by the dispatcher to match up @@ -96,7 +99,7 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { ctx->fireWrite(std::move(pre)); } - resp_msgs_[r->call_id()] = r->resp_msg(); + resp_msgs_->insert(r->call_id(), r->resp_msg()); return ctx->fireWrite( serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index ce99c9e..be5143c 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -18,6 +18,7 @@ */ #pragma once +#include <folly/AtomicHashMap.h> #include <wangle/channel/Handler.h> #include <string> @@ -51,7 +52,8 @@ private: RpcSerde serde_; // in flight requests - std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>> + std::unique_ptr<folly::AtomicHashMap< + uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-factory.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 9102d60..635d12d 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -19,40 +19,36 @@ #include "connection/connection-factory.h" -#include <folly/futures/Future.h> -#include <wangle/bootstrap/ClientBootstrap.h> -#include <wangle/channel/AsyncSocketHandler.h> -#include <wangle/channel/EventBaseHandler.h> -#include <wangle/channel/OutputBufferingHandler.h> -#include <wangle/service/ClientDispatcher.h> -#include <wangle/service/CloseOnReleaseFilter.h> -#include <wangle/service/ExpiringFilter.h> - -#include <string> +#include <wangle/concurrent/GlobalExecutor.h> #include "connection/client-dispatcher.h" #include "connection/pipeline.h" -#include "connection/request.h" -#include "connection/response.h" #include "connection/service.h" using namespace folly; using namespace hbase; -using namespace wangle; -ConnectionFactory::ConnectionFactory() : bootstrap_() { - bootstrap_.group(std::make_shared<wangle::IOThreadPoolExecutor>(1)); - bootstrap_.pipelineFactory(std::make_shared<RpcPipelineFactory>()); -} +ConnectionFactory::ConnectionFactory() + : io_pool_(std::static_pointer_cast<wangle::IOThreadPoolExecutor>( + wangle::getIOExecutor())), + pipeline_factory_(std::make_shared<RpcPipelineFactory>()) {} -std::shared_ptr<HBaseService> -ConnectionFactory::make_connection(const std::string &host, int port) { - // Connect to a given server - // Then when connected create a ClientDispactcher. - auto pipeline = bootstrap_.connect(SocketAddress(host, port, true)).get(); +std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> +ConnectionFactory::MakeBootstrap() { + auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>(); + client->group(io_pool_); + client->pipelineFactory(pipeline_factory_); + + return client; +} +std::shared_ptr<HBaseService> ConnectionFactory::Connect( + std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, + const std::string &hostname, int port) { + // Yes this will block however it makes dealing with connection pool soooooo + // much nicer. + // TODO see about using shared promise for this. + auto pipeline = client->connect(SocketAddress(hostname, port, true)).get(); auto dispatcher = std::make_shared<ClientDispatcher>(); dispatcher->setPipeline(pipeline); - auto service = std::make_shared< - CloseOnReleaseFilter<std::unique_ptr<Request>, Response>>(dispatcher); - return service; + return dispatcher; } http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-factory.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index fc4e161..2284a7c 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -33,10 +33,15 @@ public: ConnectionFactory(); virtual ~ConnectionFactory() = default; - virtual std::shared_ptr<HBaseService> make_connection(const std::string &host, - int port); + virtual std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> + MakeBootstrap(); + + virtual std::shared_ptr<HBaseService> + Connect(std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> client, + const std::string &hostname, int port); private: - wangle::ClientBootstrap<SerializePipeline> bootstrap_; + std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool_; + std::shared_ptr<RpcPipelineFactory> pipeline_factory_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-pool-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool-test.cc b/hbase-native-client/connection/connection-pool-test.cc index 975bc5e..b1a0ba0 100644 --- a/hbase-native-client/connection/connection-pool-test.cc +++ b/hbase-native-client/connection/connection-pool-test.cc @@ -24,6 +24,7 @@ #include "connection/connection-factory.h" #include "if/HBase.pb.h" +#include "serde/server-name.h" using namespace hbase; @@ -33,11 +34,16 @@ using ::testing::_; class MockConnectionFactory : public ConnectionFactory { public: - MOCK_METHOD2(make_connection, - std::shared_ptr<HBaseService>(const std::string &hostname, - int port)); + MOCK_METHOD0(MakeBootstrap, + std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>()); + MOCK_METHOD3(Connect, + std::shared_ptr<HBaseService>( + std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>, + const std::string &hostname, int port)); }; +class MockBootstrap : public wangle::ClientBootstrap<SerializePipeline> {}; + class MockServiceBase : public HBaseService { public: folly::Future<Response> operator()(std::unique_ptr<Request> req) override { @@ -54,19 +60,20 @@ public: }; TEST(TestConnectionPool, TestOnlyCreateOnce) { - std::string hostname{"hostname"}; + auto hostname = std::string{"hostname"}; + auto mock_boot = std::make_shared<MockBootstrap>(); auto mock_service = std::make_shared<MockService>(); + auto mock_cf = std::make_shared<MockConnectionFactory>(); uint32_t port{999}; - LOG(ERROR) << "About to make a MockConnectionFactory"; - auto mock_cf = std::make_shared<MockConnectionFactory>(); - EXPECT_CALL((*mock_cf), make_connection(_, _)) + EXPECT_CALL((*mock_cf), Connect(_, _, _)) .Times(1) .WillRepeatedly(Return(mock_service)); + EXPECT_CALL((*mock_cf), MakeBootstrap()) + .Times(1) + .WillRepeatedly(Return(mock_boot)); ConnectionPool cp{mock_cf}; - LOG(ERROR) << "Created ConnectionPool"; - ServerName sn; sn.set_host_name(hostname); sn.set_port(port); @@ -75,3 +82,32 @@ TEST(TestConnectionPool, TestOnlyCreateOnce) { ASSERT_TRUE(result != nullptr); result = cp.get(sn); } + +TEST(TestConnectionPool, TestOnlyCreateMultipleDispose) { + std::string hostname_one{"hostname"}; + std::string hostname_two{"hostname_two"}; + uint32_t port{999}; + + auto mock_boot = std::make_shared<MockBootstrap>(); + auto mock_service = std::make_shared<MockService>(); + auto mock_cf = std::make_shared<MockConnectionFactory>(); + + EXPECT_CALL((*mock_cf), Connect(_, _, _)) + .Times(2) + .WillRepeatedly(Return(mock_service)); + EXPECT_CALL((*mock_cf), MakeBootstrap()) + .Times(2) + .WillRepeatedly(Return(mock_boot)); + ConnectionPool cp{mock_cf}; + + { + auto result_one = cp.get(folly::to<ServerName>( + hostname_one + ":" + folly::to<std::string>(port))); + auto result_two = cp.get(folly::to<ServerName>( + hostname_two + ":" + folly::to<std::string>(port))); + } + auto result_one = cp.get( + folly::to<ServerName>(hostname_one + ":" + folly::to<std::string>(port))); + auto result_two = cp.get( + folly::to<ServerName>(hostname_two + ":" + folly::to<std::string>(port))); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-pool.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index eafe60a..6ed5ad9 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -19,6 +19,7 @@ #include "connection/connection-pool.h" +#include <folly/SocketAddress.h> #include <wangle/service/Service.h> using std::mutex; @@ -26,28 +27,46 @@ using std::unique_ptr; using std::shared_ptr; using hbase::pb::ServerName; using folly::SharedMutexWritePriority; +using folly::SocketAddress; namespace hbase { ConnectionPool::ConnectionPool() - : cf_(std::make_shared<ConnectionFactory>()), connections_(), map_mutex_() { -} + : cf_(std::make_shared<ConnectionFactory>()), clients_(), connections_(), + map_mutex_() {} ConnectionPool::ConnectionPool(std::shared_ptr<ConnectionFactory> cf) - : cf_(cf), connections_(), map_mutex_() {} + : cf_(cf), clients_(), connections_(), map_mutex_() {} + +ConnectionPool::~ConnectionPool() { + SharedMutexWritePriority::WriteHolder holder(map_mutex_); + for (auto &item : connections_) { + auto &con = item.second; + con->close(); + } + connections_.clear(); + clients_.clear(); +} std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) { + // Create a read lock. SharedMutexWritePriority::UpgradeHolder holder(map_mutex_); + auto found = connections_.find(sn); if (found == connections_.end() || found->second == nullptr) { + // Move the upgradable lock into the write lock if the connection + // hasn't been found. SharedMutexWritePriority::WriteHolder holder(std::move(holder)); - auto new_con = cf_->make_connection(sn.host_name(), sn.port()); - connections_[sn] = new_con; - return new_con; + auto client = cf_->MakeBootstrap(); + auto dispatcher = cf_->Connect(client, sn.host_name(), sn.port()); + clients_.insert(std::make_pair(sn, client)); + connections_.insert(std::make_pair(sn, dispatcher)); + return dispatcher; } return found->second; } + void ConnectionPool::close(const ServerName &sn) { - SharedMutexWritePriority::WriteHolder holder(map_mutex_); + SharedMutexWritePriority::WriteHolder holder{map_mutex_}; auto found = connections_.find(sn); if (found == connections_.end() || found->second == nullptr) { http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/connection/connection-pool.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index b8330e3..907afdb 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -37,23 +37,29 @@ struct ServerNameHash { std::size_t operator()(hbase::pb::ServerName const &s) const { std::size_t h1 = std::hash<std::string>()(s.host_name()); std::size_t h2 = std::hash<uint32_t>()(s.port()); - return h1 ^ (h2 << 1); + return h1 ^ (h2 << 2); } }; class ConnectionPool { public: ConnectionPool(); + ~ConnectionPool(); explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf); std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn); void close(const hbase::pb::ServerName &sn); private: - std::shared_ptr<ConnectionFactory> cf_; std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>, ServerNameHash, ServerNameEquals> connections_; + std::unordered_map< + hbase::pb::ServerName, + std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>>, + ServerNameHash, ServerNameEquals> + clients_; folly::SharedMutexWritePriority map_mutex_; + std::shared_ptr<ConnectionFactory> cf_; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 266c239..4b9f844 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -33,8 +33,4 @@ using namespace folly; using namespace std; using namespace hbase::pb; -namespace hbase { - -Client::Client(string quorum_spec) - : location_cache_(quorum_spec, wangle::getCPUExecutor()) {} -} +namespace hbase {} http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/location-cache.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 2667f11..e2a6251 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -25,8 +25,8 @@ #include "connection/response.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" -#include "serde/server-name.h" #include "serde/region-info.h" +#include "serde/server-name.h" #include "serde/zk.h" using namespace std; @@ -109,17 +109,22 @@ ServerName LocationCache::ReadMetaLocation() { Future<std::shared_ptr<RegionLocation>> LocationCache::LocateFromMeta(const TableName &tn, const string &row) { - auto exc = wangle::getIOExecutor(); + auto exec = wangle::getCPUExecutor(); return this->LocateMeta() - .then([&](ServerName sn) { return this->cp_.get(sn); }) - .via(exc.get()) // Need to handle all rpc's on the IOExecutor. + .via(exec.get()) + .then([ exec = exec, this ](ServerName sn) { return this->cp_.get(sn); }) .then([&](std::shared_ptr<HBaseService> service) { return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) - .then([&](Response resp) { + .then([this](Response resp) { // take the protobuf response and make it into // a region location. return this->CreateLocation(std::move(resp)); + }) + .then([ exec = exec, this ](std::shared_ptr<RegionLocation> rl) { + // Now fill out the connection. + rl->set_service(cp_.get(rl->server_name())); + return rl; }); } @@ -162,16 +167,12 @@ private: }; std::shared_ptr<RegionLocation> -LocationCache::CreateLocation(const Response &resp){ +LocationCache::CreateLocation(const Response &resp) { auto resp_msg = static_pointer_cast<ScanResponse>(resp.response()); auto &results = resp_msg->results().Get(0); auto &cells = results.cell(); - LOG(ERROR) << "resp_msg = " << resp_msg->DebugString(); auto ri = folly::to<RegionInfo>(cells.Get(0).value()); auto sn = folly::to<ServerName>(cells.Get(1).value()); - - LOG(ERROR) << "RegionInfo = " << ri.DebugString(); - LOG(ERROR) << "ServerName = " << sn.DebugString(); - auto wrapped = make_shared<RemoveServiceFilter>(cp_.get(sn), sn, this->cp_); - return std::make_shared<RegionLocation>(std::move(ri), std::move(sn), wrapped); + return std::make_shared<RegionLocation>(cells.Get(0).row(), std::move(ri), sn, + nullptr); } http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/location-cache.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index 99b5e5e..7f76428 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -48,9 +48,10 @@ public: // Meta Related Methods. // These are only public until testing is complete folly::Future<hbase::pb::ServerName> LocateMeta(); - folly::Future<std::shared_ptr<RegionLocation>> LocateFromMeta(const hbase::pb::TableName &tn, - const std::string &row); + folly::Future<std::shared_ptr<RegionLocation>> + LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row); void InvalidateMeta(); + ConnectionPool cp_; private: void RefreshMetaLocation(); @@ -61,7 +62,6 @@ private: std::shared_ptr<folly::Executor> executor_; std::unique_ptr<folly::SharedPromise<hbase::pb::ServerName>> meta_promise_; std::mutex meta_lock_; - ConnectionPool cp_; MetaUtil meta_util_; // TODO: migrate this to a smart pointer with a deleter. http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/meta-utils.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc index 1325d83..23d2041 100644 --- a/hbase-native-client/core/meta-utils.cc +++ b/hbase-native-client/core/meta-utils.cc @@ -37,12 +37,12 @@ using hbase::pb::RegionSpecifier_RegionSpecifierType; static const std::string META_REGION = "1588230740"; std::string MetaUtil::RegionLookupRowkey(const TableName &tn, - const std::string &row) const { + const std::string &row) const { return folly::to<std::string>(tn, ",", row, ",", "999999999999999999"); } -std::unique_ptr<Request> -MetaUtil::MetaRequest(const TableName tn, const std::string &row) const { +std::unique_ptr<Request> MetaUtil::MetaRequest(const TableName tn, + const std::string &row) const { auto request = Request::scan(); auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg()); http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/meta-utils.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h index 5a659f3..dfef065 100644 --- a/hbase-native-client/core/meta-utils.h +++ b/hbase-native-client/core/meta-utils.h @@ -29,8 +29,8 @@ namespace hbase { class MetaUtil { public: std::string RegionLookupRowkey(const hbase::pb::TableName &tn, - const std::string &row) const; + const std::string &row) const; std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn, - const std::string &row) const; + const std::string &row) const; }; } // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/region-location.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index 7922c95..7887526 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -27,15 +27,19 @@ namespace hbase { class RegionLocation { public: - RegionLocation(hbase::pb::RegionInfo ri, hbase::pb::ServerName sn, + RegionLocation(std::string region_name, hbase::pb::RegionInfo ri, + hbase::pb::ServerName sn, std::shared_ptr<HBaseService> service) - : ri_(ri), sn_(sn), service_(service) {} + : region_name_(region_name), ri_(ri), sn_(sn), service_(service) {} const hbase::pb::RegionInfo ®ion_info() { return ri_; } const hbase::pb::ServerName &server_name() { return sn_; } + const std::string ®ion_name() { return region_name_; } std::shared_ptr<HBaseService> service() { return service_; } + void set_service(std::shared_ptr<HBaseService> s) { service_ = s; } private: + std::string region_name_; hbase::pb::RegionInfo ri_; hbase::pb::ServerName sn_; std::shared_ptr<HBaseService> service_; http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/core/simple-client.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index 00e3369..39c82c3 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -19,16 +19,21 @@ #include <folly/Logging.h> #include <folly/Random.h> +#include <folly/futures/Future.h> #include <gflags/gflags.h> +#include <wangle/concurrent/CPUThreadPoolExecutor.h> #include <wangle/concurrent/GlobalExecutor.h> +#include <atomic> #include <chrono> #include <iostream> +#include <thread> #include "connection/connection-pool.h" #include "core/client.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" +#include "serde/server-name.h" #include "serde/table-name.h" using namespace folly; @@ -39,16 +44,41 @@ using hbase::Request; using hbase::HBaseService; using hbase::LocationCache; using hbase::ConnectionPool; +using hbase::ConnectionFactory; using hbase::pb::TableName; using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; -using hbase::pb::GetRequest; -using hbase::pb::GetResponse; +using hbase::pb::MutateRequest; +using hbase::pb::MutationProto_MutationType; // TODO(eclark): remove the need for this. DEFINE_string(table, "t", "What region to send a get"); DEFINE_string(row, "test", "What row to get"); DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to"); +DEFINE_uint64(columns, 10000, "How many columns to write"); +DEFINE_int32(threads, 6, "How many cpu threads"); + +std::unique_ptr<Request> MakeRequest(uint64_t col, std::string region_name) { + auto req = Request::mutate(); + auto msg = std::static_pointer_cast<MutateRequest>(req->req_msg()); + auto region = msg->mutable_region(); + auto suf = folly::to<std::string>(col); + + region->set_value(region_name); + region->set_type(RegionSpecifier_RegionSpecifierType:: + RegionSpecifier_RegionSpecifierType_REGION_NAME); + auto mutation = msg->mutable_mutation(); + mutation->set_row(FLAGS_row + suf); + mutation->set_mutate_type( + MutationProto_MutationType::MutationProto_MutationType_PUT); + auto column = mutation->add_column_value(); + column->set_family("d"); + auto qual = column->add_qualifier_value(); + qual->set_qualifier(suf); + qual->set_value("."); + + return std::move(req); +} int main(int argc, char *argv[]) { google::SetUsageMessage( @@ -56,13 +86,41 @@ int main(int argc, char *argv[]) { google::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); - // Create a connection factory - ConnectionPool cp; - auto cpu_ex = wangle::getCPUExecutor(); - LocationCache cache{FLAGS_zookeeper, cpu_ex}; - auto result = - cache.LocateFromMeta(folly::to<TableName>(FLAGS_table), FLAGS_row) - .get(milliseconds(5000)); + // Set up thread pools. + auto cpu_pool = + std::make_shared<wangle::CPUThreadPoolExecutor>(FLAGS_threads); + wangle::setCPUExecutor(cpu_pool); + auto io_pool = std::make_shared<wangle::IOThreadPoolExecutor>(5); + wangle::setIOExecutor(io_pool); + + // Create the cache. + LocationCache cache{FLAGS_zookeeper, cpu_pool}; + + auto row = FLAGS_row; + auto tn = folly::to<TableName>(FLAGS_table); + + auto loc = cache.LocateFromMeta(tn, row).get(milliseconds(5000)); + auto connection = loc->service(); + + auto num_puts = FLAGS_columns; + + auto results = std::vector<Future<Response>>{}; + uint64_t col{0}; + for (; col < num_puts; col++) { + results.push_back(folly::makeFuture(col) + .via(cpu_pool.get()) + .then([loc](uint64_t col) { + return MakeRequest(col, loc->region_name()); + }) + .then([connection](std::unique_ptr<Request> req) { + return (*connection)(std::move(req)); + })); + } + auto allf = folly::collect(results).get(); + + LOG(ERROR) << "Successfully sent " << allf.size() << " requests."; + + io_pool->stop(); return 0; } http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/serde/region-info-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/region-info-deserializer-test.cc b/hbase-native-client/serde/region-info-deserializer-test.cc index ce8dedf..5cb8482 100644 --- a/hbase-native-client/serde/region-info-deserializer-test.cc +++ b/hbase-native-client/serde/region-info-deserializer-test.cc @@ -44,7 +44,6 @@ TEST(TestRegionInfoDesializer, TestDeserialize) { ri_out.set_start_key(start_row); ri_out.set_end_key(stop_row); - string header{"PBUF"}; string ser = header + ri_out.SerializeAsString(); http://git-wip-us.apache.org/repos/asf/hbase/blob/79b5085d/hbase-native-client/serde/region-info.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/region-info.h b/hbase-native-client/serde/region-info.h index 6af351c..e2ecfc9 100644 --- a/hbase-native-client/serde/region-info.h +++ b/hbase-native-client/serde/region-info.h @@ -21,16 +21,16 @@ #include "if/HBase.pb.h" -#include <folly/Conv.h> #include <boost/algorithm/string/predicate.hpp> +#include <folly/Conv.h> namespace hbase { namespace pb { -template <class String> void parseTo(String in, RegionInfo& out) { +template <class String> void parseTo(String in, RegionInfo &out) { // TODO(eclark): there has to be something better. std::string s = folly::to<std::string>(in); - if (!boost::starts_with(s, "PBUF") ) { + if (!boost::starts_with(s, "PBUF")) { throw std::runtime_error("Region Info field doesn't contain preamble"); } if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) {