This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-14850 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4022d59161447dbc70722fd8f4fd418def5e9fc6 Author: Elliott Clark <[email protected]> AuthorDate: Tue May 3 12:17:07 2016 -0700 HBASE-15750 Add on meta deserialization Summary: Add on meta region info deserialization Test Plan: Unit tests. Simple client connects. Differential Revision: https://reviews.facebook.net/D57555 --- .../connection/client-dispatcher.cc | 1 + hbase-native-client/connection/client-handler.cc | 13 ++-- hbase-native-client/connection/client-handler.h | 6 +- hbase-native-client/connection/connection-pool.cc | 36 +---------- hbase-native-client/connection/connection-pool.h | 8 +-- hbase-native-client/core/BUCK | 4 -- hbase-native-client/core/location-cache-test.cc | 1 + hbase-native-client/core/location-cache.cc | 73 +++++++++++++++++++--- hbase-native-client/core/location-cache.h | 7 +-- hbase-native-client/core/meta-utils.cc | 9 +-- hbase-native-client/core/meta-utils.h | 7 ++- hbase-native-client/core/region-location.h | 5 +- hbase-native-client/core/simple-client.cc | 6 +- hbase-native-client/serde/BUCK | 56 ++++++++--------- .../serde/client-deserializer-test.cc | 25 ++++---- hbase-native-client/serde/client-deserializer.cc | 68 -------------------- .../serde/client-serializer-test.cc | 26 ++++---- .../region-info-deserializer-test.cc} | 58 +++++++++-------- .../{core/table-name.h => serde/region-info.h} | 31 ++++----- .../serde/{client-serializer.cc => rpc.cc} | 70 ++++++++++++++++----- .../serde/{client-serializer.h => rpc.h} | 31 ++++----- .../{client-deserializer.h => server-name-test.cc} | 24 +++---- hbase-native-client/serde/server-name.h | 21 +++++++ .../{core => serde}/table-name-test.cc | 2 +- hbase-native-client/{core => serde}/table-name.h | 22 ++++--- hbase-native-client/serde/zk-deserializer-test.cc | 8 +-- .../serde/{zk-deserializer.cc => zk.cc} | 4 +- .../serde/{zk-deserializer.h => zk.h} | 2 +- 28 files changed, 316 insertions(+), 308 deletions(-) diff --git a/hbase-native-client/connection/client-dispatcher.cc b/hbase-native-client/connection/client-dispatcher.cc index 817adc1..6e2dc54 100644 --- a/hbase-native-client/connection/client-dispatcher.cc +++ b/hbase-native-client/connection/client-dispatcher.cc @@ -44,6 +44,7 @@ Future<Response> ClientDispatcher::operator()(std::unique_ptr<Request> arg) { auto &p = requests_[call_id]; auto f = p.getFuture(); p.setInterruptHandler([call_id, this](const folly::exception_wrapper &e) { + LOG(ERROR) << "e = " << call_id; this->requests_.erase(call_id); }); this->pipeline_->write(std::move(arg)); diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 3180f4e..496e4f2 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -37,8 +37,7 @@ using hbase::pb::GetResponse; using google::protobuf::Message; ClientHandler::ClientHandler(std::string user_name) - : user_name_(user_name), need_send_header_(true), ser_(), deser_(), - resp_msgs_() {} + : user_name_(user_name), need_send_header_(true), serde_(), resp_msgs_() {} void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { if (LIKELY(buf != nullptr)) { @@ -46,7 +45,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { Response received; ResponseHeader header; - int used_bytes = deser_.parse_delimited(buf.get(), &header); + int used_bytes = serde_.ParseDelimited(buf.get(), &header); LOG(INFO) << "Read ResponseHeader size=" << used_bytes << " call_id=" << header.call_id() << " has_exception=" << header.has_exception(); @@ -70,7 +69,7 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<IOBuf> buf) { // data left on the wire. if (header.has_exception() == false) { buf->trimStart(used_bytes); - used_bytes = deser_.parse_delimited(buf.get(), resp_msg.get()); + used_bytes = serde_.ParseDelimited(buf.get(), resp_msg.get()); // Make sure that bytes were parsed. CHECK(used_bytes == buf->length()); received.set_response(resp_msg); @@ -91,13 +90,13 @@ Future<Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { // and one for the request. // // That doesn't seem like too bad, but who knows. - auto pre = ser_.preamble(); - auto header = ser_.header(user_name_); + auto pre = serde_.Preamble(); + auto header = serde_.Header(user_name_); pre->appendChain(std::move(header)); ctx->fireWrite(std::move(pre)); } resp_msgs_[r->call_id()] = r->resp_msg(); return ctx->fireWrite( - ser_.request(r->call_id(), r->method(), r->req_msg().get())); + serde_.Request(r->call_id(), r->method(), r->req_msg().get())); } diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 68513de..ce99c9e 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -22,8 +22,7 @@ #include <string> -#include "serde/client-deserializer.h" -#include "serde/client-serializer.h" +#include "serde/rpc.h" // Forward decs. namespace hbase { @@ -49,8 +48,7 @@ public: private: bool need_send_header_; std::string user_name_; - ClientSerializer ser_; - ClientDeserializer deser_; + RpcSerde serde_; // in flight requests std::unordered_map<uint32_t, std::shared_ptr<google::protobuf::Message>> diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index 72c1306..eafe60a 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -25,41 +25,10 @@ using std::mutex; using std::unique_ptr; using std::shared_ptr; using hbase::pb::ServerName; -using wangle::ServiceFilter; using folly::SharedMutexWritePriority; namespace hbase { -class RemoveServiceFilter - : public ServiceFilter<unique_ptr<Request>, Response> { - -public: - RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn, - ConnectionPool &cp) - : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn), - cp_(cp) {} - - folly::Future<folly::Unit> close() override { - if (!released.exchange(true)) { - return this->service_->close().then( - [this]() { this->cp_.close(this->sn_); }); - } else { - return folly::makeFuture(); - } - } - - virtual bool isAvailable() override { return service_->isAvailable(); } - - folly::Future<Response> operator()(unique_ptr<Request> req) override { - return (*this->service_)(std::move(req)); - } - -private: - std::atomic<bool> released{false}; - hbase::pb::ServerName sn_; - ConnectionPool &cp_; -}; - ConnectionPool::ConnectionPool() : cf_(std::make_shared<ConnectionFactory>()), connections_(), map_mutex_() { } @@ -72,13 +41,12 @@ std::shared_ptr<HBaseService> ConnectionPool::get(const ServerName &sn) { if (found == connections_.end() || found->second == nullptr) { SharedMutexWritePriority::WriteHolder holder(std::move(holder)); auto new_con = cf_->make_connection(sn.host_name(), sn.port()); - auto wrapped = std::make_shared<RemoveServiceFilter>(new_con, sn, *this); - connections_[sn] = wrapped; + connections_[sn] = new_con; return new_con; } return found->second; } -void ConnectionPool::close(ServerName sn) { +void ConnectionPool::close(const ServerName &sn) { SharedMutexWritePriority::WriteHolder holder(map_mutex_); auto found = connections_.find(sn); diff --git a/hbase-native-client/connection/connection-pool.h b/hbase-native-client/connection/connection-pool.h index 394cd71..b8330e3 100644 --- a/hbase-native-client/connection/connection-pool.h +++ b/hbase-native-client/connection/connection-pool.h @@ -27,13 +27,13 @@ #include "if/HBase.pb.h" namespace hbase { -struct MyServerNameEquals { +struct ServerNameEquals { bool operator()(const hbase::pb::ServerName &lhs, const hbase::pb::ServerName &rhs) const { return lhs.host_name() == rhs.host_name() && lhs.port() == rhs.port(); } }; -struct MyServerNameHash { +struct ServerNameHash { std::size_t operator()(hbase::pb::ServerName const &s) const { std::size_t h1 = std::hash<std::string>()(s.host_name()); std::size_t h2 = std::hash<uint32_t>()(s.port()); @@ -46,12 +46,12 @@ public: ConnectionPool(); explicit ConnectionPool(std::shared_ptr<ConnectionFactory> cf); std::shared_ptr<HBaseService> get(const hbase::pb::ServerName &sn); - void close(hbase::pb::ServerName sn); + void close(const hbase::pb::ServerName &sn); private: std::shared_ptr<ConnectionFactory> cf_; std::unordered_map<hbase::pb::ServerName, std::shared_ptr<HBaseService>, - MyServerNameHash, MyServerNameEquals> + ServerNameHash, ServerNameEquals> connections_; folly::SharedMutexWritePriority map_mutex_; }; diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index 447248b..ef8c2f8 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -24,7 +24,6 @@ cxx_library( "hbase_macros.h", "region-location.h", "location-cache.h", - "table-name.h", # TODO: move this out of exported # Once meta lookup works "meta-utils.h", @@ -53,9 +52,6 @@ cxx_test(name="location-cache-test", ], deps=[":core", ], run_test_separately=True, ) -cxx_test(name="table-name-test", - srcs=["table-name-test.cc", ], - deps=[":core", ], ) cxx_binary(name="simple-client", srcs=["simple-client.cc", ], deps=[":core", "//connection:connection"], ) diff --git a/hbase-native-client/core/location-cache-test.cc b/hbase-native-client/core/location-cache-test.cc index f3166fb..172799d 100644 --- a/hbase-native-client/core/location-cache-test.cc +++ b/hbase-native-client/core/location-cache-test.cc @@ -30,4 +30,5 @@ TEST(LocationCacheTest, TestGetMetaNodeContents) { auto result = f.get(); ASSERT_FALSE(f.hasException()); ASSERT_TRUE(result.has_port()); + ASSERT_TRUE(result.has_host_name()); } diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index 539051a..2667f11 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -20,19 +20,25 @@ #include <folly/Logging.h> #include <folly/io/IOBuf.h> +#include <wangle/concurrent/GlobalExecutor.h> #include "connection/response.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" -#include "serde/zk-deserializer.h" +#include "serde/server-name.h" +#include "serde/region-info.h" +#include "serde/zk.h" using namespace std; using namespace folly; +using wangle::ServiceFilter; +using hbase::Request; using hbase::Response; using hbase::LocationCache; using hbase::RegionLocation; using hbase::HBaseService; +using hbase::ConnectionPool; using hbase::pb::ScanResponse; using hbase::pb::TableName; using hbase::pb::ServerName; @@ -45,7 +51,7 @@ static const char META_ZNODE_NAME[] = "/hbase/meta-region-server"; LocationCache::LocationCache(string quorum_spec, shared_ptr<folly::Executor> executor) : quorum_spec_(quorum_spec), executor_(executor), meta_promise_(nullptr), - meta_lock_(), cp_(), meta_util_() { + meta_lock_(), cp_(), meta_util_(), zk_(nullptr) { zk_ = zookeeper_init(quorum_spec.c_str(), nullptr, 1000, 0, 0, 0); } @@ -95,28 +101,77 @@ ServerName LocationCache::ReadMetaLocation() { buf->append(len); MetaRegionServer mrs; - if (derser.parse(buf.get(), &mrs) == false) { + if (derser.Parse(buf.get(), &mrs) == false) { LOG(ERROR) << "Unable to decode"; } return mrs.server(); } -Future<RegionLocation> LocationCache::locateFromMeta(const TableName &tn, - const string &row) { +Future<std::shared_ptr<RegionLocation>> +LocationCache::LocateFromMeta(const TableName &tn, const string &row) { + auto exc = wangle::getIOExecutor(); return this->LocateMeta() .then([&](ServerName sn) { return this->cp_.get(sn); }) + .via(exc.get()) // Need to handle all rpc's on the IOExecutor. .then([&](std::shared_ptr<HBaseService> service) { - return (*service)(std::move(meta_util_.make_meta_request(tn, row))); + return (*service)(std::move(meta_util_.MetaRequest(tn, row))); }) .then([&](Response resp) { // take the protobuf response and make it into // a region location. - return this->parse_response(std::move(resp)); + return this->CreateLocation(std::move(resp)); }); } -RegionLocation LocationCache::parse_response(const Response &resp) { +class RemoveServiceFilter + : public ServiceFilter<std::unique_ptr<Request>, Response> { + +public: + RemoveServiceFilter(std::shared_ptr<HBaseService> service, ServerName sn, + ConnectionPool &cp) + : ServiceFilter<unique_ptr<Request>, Response>(service), sn_(sn), + cp_(cp) {} + + folly::Future<folly::Unit> close() override { + if (!released.exchange(true)) { + return this->service_->close().then([this]() { + // TODO(eclark): remove the service from the meta cache. + this->cp_.close(this->sn_); + }); + } else { + return folly::makeFuture(); + } + } + + virtual bool isAvailable() override { + return !released && service_->isAvailable(); + } + + folly::Future<Response> operator()(unique_ptr<Request> req) override { + // TODO(eclark): add in an on error handler that will + // remove the region location from the cache if needed. + // Also close the connection if this is likely to be an error + // that needs to get a new connection. + return (*this->service_)(std::move(req)); + } + +private: + std::atomic<bool> released{false}; + hbase::pb::ServerName sn_; + ConnectionPool &cp_; +}; + +std::shared_ptr<RegionLocation> +LocationCache::CreateLocation(const Response &resp){ auto resp_msg = static_pointer_cast<ScanResponse>(resp.response()); + auto &results = resp_msg->results().Get(0); + auto &cells = results.cell(); LOG(ERROR) << "resp_msg = " << resp_msg->DebugString(); - return RegionLocation{RegionInfo{}, ServerName{}, nullptr}; + auto ri = folly::to<RegionInfo>(cells.Get(0).value()); + auto sn = folly::to<ServerName>(cells.Get(1).value()); + + LOG(ERROR) << "RegionInfo = " << ri.DebugString(); + LOG(ERROR) << "ServerName = " << sn.DebugString(); + auto wrapped = make_shared<RemoveServiceFilter>(cp_.get(sn), sn, this->cp_); + return std::make_shared<RegionLocation>(std::move(ri), std::move(sn), wrapped); } diff --git a/hbase-native-client/core/location-cache.h b/hbase-native-client/core/location-cache.h index cfd6838..99b5e5e 100644 --- a/hbase-native-client/core/location-cache.h +++ b/hbase-native-client/core/location-cache.h @@ -29,8 +29,8 @@ #include "connection/connection-pool.h" #include "core/meta-utils.h" -#include "core/table-name.h" #include "core/region-location.h" +#include "serde/table-name.h" namespace hbase { @@ -48,14 +48,14 @@ public: // Meta Related Methods. // These are only public until testing is complete folly::Future<hbase::pb::ServerName> LocateMeta(); - folly::Future<RegionLocation> locateFromMeta(const hbase::pb::TableName &tn, + folly::Future<std::shared_ptr<RegionLocation>> LocateFromMeta(const hbase::pb::TableName &tn, const std::string &row); - RegionLocation parse_response(const Response &resp); void InvalidateMeta(); private: void RefreshMetaLocation(); hbase::pb::ServerName ReadMetaLocation(); + std::shared_ptr<RegionLocation> CreateLocation(const Response &resp); std::string quorum_spec_; std::shared_ptr<folly::Executor> executor_; @@ -64,7 +64,6 @@ private: ConnectionPool cp_; MetaUtil meta_util_; - // TODO: migrate this to a smart pointer with a deleter. zhandle_t *zk_; }; diff --git a/hbase-native-client/core/meta-utils.cc b/hbase-native-client/core/meta-utils.cc index d2fdd88..1325d83 100644 --- a/hbase-native-client/core/meta-utils.cc +++ b/hbase-native-client/core/meta-utils.cc @@ -23,25 +23,26 @@ #include "connection/request.h" #include "connection/response.h" -#include "core/table-name.h" #include "if/Client.pb.h" +#include "serde/table-name.h" using hbase::pb::TableName; using hbase::MetaUtil; using hbase::Request; using hbase::Response; using hbase::pb::ScanRequest; +using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; static const std::string META_REGION = "1588230740"; -std::string MetaUtil::region_lookup_rowkey(const TableName &tn, +std::string MetaUtil::RegionLookupRowkey(const TableName &tn, const std::string &row) const { return folly::to<std::string>(tn, ",", row, ",", "999999999999999999"); } std::unique_ptr<Request> -MetaUtil::make_meta_request(const TableName tn, const std::string &row) const { +MetaUtil::MetaRequest(const TableName tn, const std::string &row) const { auto request = Request::scan(); auto msg = std::static_pointer_cast<ScanRequest>(request->req_msg()); @@ -76,6 +77,6 @@ MetaUtil::make_meta_request(const TableName tn, const std::string &row) const { info_col->add_qualifier("server"); info_col->add_qualifier("regioninfo"); - scan->set_start_row(region_lookup_rowkey(tn, row)); + scan->set_start_row(RegionLookupRowkey(tn, row)); return request; } diff --git a/hbase-native-client/core/meta-utils.h b/hbase-native-client/core/meta-utils.h index e007d02..5a659f3 100644 --- a/hbase-native-client/core/meta-utils.h +++ b/hbase-native-client/core/meta-utils.h @@ -21,15 +21,16 @@ #include <string> #include "connection/Request.h" -#include "core/table-name.h" +#include "if/HBase.pb.h" +#include "serde/table-name.h" namespace hbase { class MetaUtil { public: - std::string region_lookup_rowkey(const hbase::pb::TableName &tn, + std::string RegionLookupRowkey(const hbase::pb::TableName &tn, const std::string &row) const; - std::unique_ptr<Request> make_meta_request(const hbase::pb::TableName tn, + std::unique_ptr<Request> MetaRequest(const hbase::pb::TableName tn, const std::string &row) const; }; } // namespace hbase diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index a46b8e2..7922c95 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -23,7 +23,6 @@ #include "connection/service.h" #include "if/HBase.pb.h" - namespace hbase { class RegionLocation { @@ -32,8 +31,8 @@ public: std::shared_ptr<HBaseService> service) : ri_(ri), sn_(sn), service_(service) {} - const hbase::pb::RegionInfo& region_info() { return ri_; } - const hbase::pb::ServerName& server_name() { return sn_; } + const hbase::pb::RegionInfo ®ion_info() { return ri_; } + const hbase::pb::ServerName &server_name() { return sn_; } std::shared_ptr<HBaseService> service() { return service_; } private: diff --git a/hbase-native-client/core/simple-client.cc b/hbase-native-client/core/simple-client.cc index ab614e4..00e3369 100644 --- a/hbase-native-client/core/simple-client.cc +++ b/hbase-native-client/core/simple-client.cc @@ -27,9 +27,9 @@ #include "connection/connection-pool.h" #include "core/client.h" -#include "core/table-name.h" #include "if/Client.pb.h" #include "if/ZooKeeper.pb.h" +#include "serde/table-name.h" using namespace folly; using namespace std; @@ -39,7 +39,7 @@ using hbase::Request; using hbase::HBaseService; using hbase::LocationCache; using hbase::ConnectionPool; -using hbase::TableNameUtil; +using hbase::pb::TableName; using hbase::pb::ServerName; using hbase::pb::RegionSpecifier_RegionSpecifierType; using hbase::pb::GetRequest; @@ -61,7 +61,7 @@ int main(int argc, char *argv[]) { auto cpu_ex = wangle::getCPUExecutor(); LocationCache cache{FLAGS_zookeeper, cpu_ex}; auto result = - cache.locateFromMeta(TableNameUtil::create(FLAGS_table), FLAGS_row) + cache.LocateFromMeta(folly::to<TableName>(FLAGS_table), FLAGS_row) .get(milliseconds(5000)); return 0; diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK index 539a221..db15026 100644 --- a/hbase-native-client/serde/BUCK +++ b/hbase-native-client/serde/BUCK @@ -17,49 +17,47 @@ cxx_library(name="serde", exported_headers=[ - "client-serializer.h", - "client-deserializer.h", - "zk-deserializer.h", + "region-info.h", + "rpc.h", + "server-name.h", + "table-name.h", + "zk.h", ], srcs=[ - "client-serializer.cc", - "client-deserializer.cc", - "zk-deserializer.cc", + "rpc.cc", + "zk.cc", ], deps=[ "//if:if", "//third-party:folly", ], tests=[ - ":client-serializer-test", ":client-deserializer-test", + ":client-serializer-test", + ":server-name-test", + ":table-name-test", + ":zk-deserializer-test", + ":region-info-deserializer-test", ], compiler_flags=['-Weffc++'], visibility=[ 'PUBLIC', ], ) - +cxx_test(name="table-name-test", + srcs=["table-name-test.cc", ], + deps=[":serde", ], ) +cxx_test(name="server-name-test", + srcs=["server-name-test.cc", ], + deps=[":serde", ], ) cxx_test(name="client-serializer-test", - srcs=[ - "client-serializer-test.cc", - ], - deps=[ - ":serde", - "//if:if", - ], ) + srcs=["client-serializer-test.cc", ], + deps=[":serde", ], ) cxx_test(name="client-deserializer-test", - srcs=[ - "client-deserializer-test.cc", - ], - deps=[ - ":serde", - "//if:if", - ], ) + srcs=["client-deserializer-test.cc", ], + deps=[":serde", ], ) cxx_test(name="zk-deserializer-test", - srcs=[ - "zk-deserializer-test.cc", - ], - deps=[ - ":serde", - "//if:if", - ], ) + srcs=["zk-deserializer-test.cc", ], + deps=[":serde", ], ) +cxx_test(name="region-info-deserializer-test", + srcs=["region-info-deserializer-test.cc", ], + deps=[":serde", ], ) diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc index 9fef093..8c571b1 100644 --- a/hbase-native-client/serde/client-deserializer-test.cc +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -16,13 +16,12 @@ * limitations under the License. * */ +#include "serde/rpc.h" #include <folly/io/IOBuf.h> #include <gtest/gtest.h> #include "if/Client.pb.h" -#include "serde/client-deserializer.h" -#include "serde/client-serializer.h" using namespace hbase; using folly::IOBuf; @@ -30,23 +29,23 @@ using hbase::pb::GetRequest; using hbase::pb::RegionSpecifier; using hbase::pb::RegionSpecifier_RegionSpecifierType; -TEST(TestClientDeserializer, TestReturnFalseOnNullPtr) { - ClientDeserializer deser; - ASSERT_LT(deser.parse_delimited(nullptr, nullptr), 0); +TEST(TestRpcSerde, TestReturnFalseOnNullPtr) { + RpcSerde deser; + ASSERT_LT(deser.ParseDelimited(nullptr, nullptr), 0); } -TEST(TestClientDeserializer, TestReturnFalseOnBadInput) { - ClientDeserializer deser; +TEST(TestRpcSerde, TestReturnFalseOnBadInput) { + RpcSerde deser; auto buf = IOBuf::copyBuffer("test"); GetRequest gr; - ASSERT_LT(deser.parse_delimited(buf.get(), &gr), 0); + ASSERT_LT(deser.ParseDelimited(buf.get(), &gr), 0); } -TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) { +TEST(TestRpcSerde, TestGoodGetRequestFullRoundTrip) { GetRequest in; - ClientSerializer ser; - ClientDeserializer deser; + RpcSerde ser; + RpcSerde deser; // fill up the GetRequest. in.mutable_region()->set_value("test_region_id"); @@ -56,11 +55,11 @@ TEST(TestClientDeserializer, TestGoodGetRequestFullRoundTrip) { in.mutable_get()->set_row("test_row"); // Create the buffer - auto buf = ser.serialize_delimited(in); + auto buf = ser.SerializeDelimited(in); GetRequest out; - int used_bytes = deser.parse_delimited(buf.get(), &out); + int used_bytes = deser.ParseDelimited(buf.get(), &out); ASSERT_GT(used_bytes, 0); ASSERT_EQ(used_bytes, buf->length()); diff --git a/hbase-native-client/serde/client-deserializer.cc b/hbase-native-client/serde/client-deserializer.cc deleted file mode 100644 index acca7ea..0000000 --- a/hbase-native-client/serde/client-deserializer.cc +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "serde/client-deserializer.h" - -#include <folly/Logging.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> -#include <google/protobuf/message.h> - -using namespace hbase; - -using folly::IOBuf; -using google::protobuf::Message; -using google::protobuf::io::ArrayInputStream; -using google::protobuf::io::CodedInputStream; - -int ClientDeserializer::parse_delimited(const IOBuf *buf, Message *msg) { - if (buf == nullptr || msg == nullptr) { - return -2; - } - - DCHECK(!buf->isChained()); - - ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; - CodedInputStream coded_stream{&ais}; - - uint32_t msg_size; - - // Try and read the varint. - if (coded_stream.ReadVarint32(&msg_size) == false) { - FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; - return -3; - } - - coded_stream.PushLimit(msg_size); - // Parse the message. - if (msg->MergeFromCodedStream(&coded_stream) == false) { - FB_LOG_EVERY_MS(ERROR, 1000) - << "Unable to read a protobuf message from data."; - return -4; - } - - // Make sure all the data was consumed. - if (coded_stream.ConsumedEntireMessage() == false) { - FB_LOG_EVERY_MS(ERROR, 1000) - << "Orphaned data left after reading protobuf message"; - return -5; - } - - return coded_stream.CurrentPosition(); -} diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc index 9bf38af..2bd17fb 100644 --- a/hbase-native-client/serde/client-serializer-test.cc +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -24,16 +24,16 @@ #include "if/HBase.pb.h" #include "if/RPC.pb.h" -#include "serde/client-serializer.h" +#include "serde/rpc.h" using namespace hbase; using namespace hbase::pb; using namespace folly; using namespace folly::io; -TEST(ClientSerializerTest, PreambleIncludesHBas) { - ClientSerializer ser; - auto buf = ser.preamble(); +TEST(RpcSerdeTest, PreambleIncludesHBas) { + RpcSerde ser; + auto buf = ser.Preamble(); const char *p = reinterpret_cast<const char *>(buf->data()); // Take the first for chars and make sure they are the // magic string @@ -42,16 +42,16 @@ TEST(ClientSerializerTest, PreambleIncludesHBas) { EXPECT_EQ(6, buf->computeChainDataLength()); } -TEST(ClientSerializerTest, PreambleIncludesVersion) { - ClientSerializer ser; - auto buf = ser.preamble(); +TEST(RpcSerdeTest, PreambleIncludesVersion) { + RpcSerde ser; + auto buf = ser.Preamble(); EXPECT_EQ(0, static_cast<const uint8_t *>(buf->data())[4]); EXPECT_EQ(80, static_cast<const uint8_t *>(buf->data())[5]); } -TEST(ClientSerializerTest, TestHeaderLengthPrefixed) { - ClientSerializer ser; - auto header = ser.header("elliott"); +TEST(RpcSerdeTest, TestHeaderLengthPrefixed) { + RpcSerde ser; + auto header = ser.Header("elliott"); // The header should be prefixed by 4 bytes of length. EXPECT_EQ(4, header->length()); @@ -64,9 +64,9 @@ TEST(ClientSerializerTest, TestHeaderLengthPrefixed) { EXPECT_EQ(prefixed_len, header->next()->length()); } -TEST(ClientSerializerTest, TestHeaderDecode) { - ClientSerializer ser; - auto buf = ser.header("elliott"); +TEST(RpcSerdeTest, TestHeaderDecode) { + RpcSerde ser; + auto buf = ser.Header("elliott"); auto header_buf = buf->next(); ConnectionHeader h; diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/serde/region-info-deserializer-test.cc similarity index 53% copy from hbase-native-client/core/table-name.h copy to hbase-native-client/serde/region-info-deserializer-test.cc index 1612667..ce8dedf 100644 --- a/hbase-native-client/core/table-name.h +++ b/hbase-native-client/serde/region-info-deserializer-test.cc @@ -16,35 +16,39 @@ * limitations under the License. * */ -#pragma once -#include <memory> +#include "serde/region-info.h" + +#include <gtest/gtest.h> + #include <string> #include "if/HBase.pb.h" -#include <folly/Conv.h> - -namespace hbase { -namespace pb { - -// Provide folly::to<std::string>(TableName); -template <class String> void toAppend(const TableName &in, String *result) { - if (!in.has_namespace_() || in.namespace_() == "default") { - folly::toAppend(in.qualifier(), result); - } else { - folly::toAppend(in.namespace_(), ':', in.qualifier(), result); - } -} +#include "serde/table-name.h" + +using std::string; +using hbase::pb::RegionInfo; +using hbase::pb::TableName; + +TEST(TestRegionInfoDesializer, TestDeserialize) { + string ns{"test_ns"}; + string tn{"table_name"}; + string start_row{"AAAAAA"}; + string stop_row{"BBBBBBBBBBBB"}; + uint64_t region_id = 2345678; -} // namespace pb - -class TableNameUtil { -public: - static ::hbase::pb::TableName create(std::string table_name) { - ::hbase::pb::TableName tn; - tn.set_namespace_("default"); - tn.set_qualifier(table_name); - return tn; - } -}; -} // namespace hbase + RegionInfo ri_out; + ri_out.set_region_id(region_id); + ri_out.mutable_table_name()->set_namespace_(ns); + ri_out.mutable_table_name()->set_qualifier(tn); + ri_out.set_start_key(start_row); + ri_out.set_end_key(stop_row); + + + string header{"PBUF"}; + string ser = header + ri_out.SerializeAsString(); + + auto out = folly::to<RegionInfo>(ser); + + EXPECT_EQ(region_id, out.region_id()); +} diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/serde/region-info.h similarity index 63% copy from hbase-native-client/core/table-name.h copy to hbase-native-client/serde/region-info.h index 1612667..6af351c 100644 --- a/hbase-native-client/core/table-name.h +++ b/hbase-native-client/serde/region-info.h @@ -16,35 +16,26 @@ * limitations under the License. * */ -#pragma once -#include <memory> -#include <string> +#pragma once #include "if/HBase.pb.h" + #include <folly/Conv.h> +#include <boost/algorithm/string/predicate.hpp> namespace hbase { namespace pb { +template <class String> void parseTo(String in, RegionInfo& out) { + // TODO(eclark): there has to be something better. + std::string s = folly::to<std::string>(in); -// Provide folly::to<std::string>(TableName); -template <class String> void toAppend(const TableName &in, String *result) { - if (!in.has_namespace_() || in.namespace_() == "default") { - folly::toAppend(in.qualifier(), result); - } else { - folly::toAppend(in.namespace_(), ':', in.qualifier(), result); + if (!boost::starts_with(s, "PBUF") ) { + throw std::runtime_error("Region Info field doesn't contain preamble"); + } + if (!out.ParseFromArray(s.data() + 4, s.size() - 4)) { + throw std::runtime_error("Bad protobuf for RegionInfo"); } } - } // namespace pb - -class TableNameUtil { -public: - static ::hbase::pb::TableName create(std::string table_name) { - ::hbase::pb::TableName tn; - tn.set_namespace_("default"); - tn.set_qualifier(table_name); - return tn; - } -}; } // namespace hbase diff --git a/hbase-native-client/serde/client-serializer.cc b/hbase-native-client/serde/rpc.cc similarity index 67% rename from hbase-native-client/serde/client-serializer.cc rename to hbase-native-client/serde/rpc.cc index 09b81c8..4c3c999 100644 --- a/hbase-native-client/serde/client-serializer.cc +++ b/hbase-native-client/serde/rpc.cc @@ -16,12 +16,15 @@ * limitations under the License. * */ -#include "serde/client-serializer.h" +#include "serde/rpc.h" + +#include <folly/Logging.h> #include <folly/Logging.h> #include <folly/io/Cursor.h> #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <google/protobuf/message.h> #include "if/HBase.pb.h" #include "if/RPC.pb.h" @@ -31,7 +34,10 @@ using namespace hbase; using folly::IOBuf; using folly::io::RWPrivateCursor; using google::protobuf::Message; +using google::protobuf::Message; +using google::protobuf::io::ArrayInputStream; using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::CodedInputStream; using google::protobuf::io::CodedOutputStream; using google::protobuf::io::ZeroCopyOutputStream; using std::string; @@ -42,9 +48,46 @@ static const std::string INTERFACE = "ClientService"; static const uint8_t RPC_VERSION = 0; static const uint8_t DEFAULT_AUTH_TYPE = 80; -ClientSerializer::ClientSerializer() : auth_type_(DEFAULT_AUTH_TYPE) {} +int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { + if (buf == nullptr || msg == nullptr) { + return -2; + } + + DCHECK(!buf->isChained()); + + ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; + CodedInputStream coded_stream{&ais}; + + uint32_t msg_size; + + // Try and read the varint. + if (coded_stream.ReadVarint32(&msg_size) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; + return -3; + } + + coded_stream.PushLimit(msg_size); + // Parse the message. + if (msg->MergeFromCodedStream(&coded_stream) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Unable to read a protobuf message from data."; + return -4; + } + + // Make sure all the data was consumed. + if (coded_stream.ConsumedEntireMessage() == false) { + FB_LOG_EVERY_MS(ERROR, 1000) + << "Orphaned data left after reading protobuf message"; + return -5; + } + + return coded_stream.CurrentPosition(); +} + +RpcSerde::RpcSerde() : auth_type_(DEFAULT_AUTH_TYPE) {} +RpcSerde::~RpcSerde() {} -unique_ptr<IOBuf> ClientSerializer::preamble() { +unique_ptr<IOBuf> RpcSerde::Preamble() { auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); magic->append(2); RWPrivateCursor c(magic.get()); @@ -56,7 +99,7 @@ unique_ptr<IOBuf> ClientSerializer::preamble() { return magic; } -unique_ptr<IOBuf> ClientSerializer::header(const string &user) { +unique_ptr<IOBuf> RpcSerde::Header(const string &user) { pb::ConnectionHeader h; // TODO(eclark): Make this not a total lie. @@ -68,26 +111,25 @@ unique_ptr<IOBuf> ClientSerializer::header(const string &user) { // It worked for a while with the java client; until it // didn't. h.set_service_name(INTERFACE); - return prepend_length(serialize_message(h)); + return PrependLength(SerializeMessage(h)); } -unique_ptr<IOBuf> ClientSerializer::request(const uint32_t call_id, - const string &method, - const Message *msg) { +unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, + const string &method, const Message *msg) { pb::RequestHeader rq; rq.set_method_name(method); rq.set_call_id(call_id); rq.set_request_param(msg != nullptr); - auto ser_header = serialize_delimited(rq); + auto ser_header = SerializeDelimited(rq); if (msg != nullptr) { - auto ser_req = serialize_delimited(*msg); + auto ser_req = SerializeDelimited(*msg); ser_header->appendChain(std::move(ser_req)); } - return prepend_length(std::move(ser_header)); + return PrependLength(std::move(ser_header)); } -unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) { +unique_ptr<IOBuf> RpcSerde::PrependLength(unique_ptr<IOBuf> msg) { // Java ints are 4 long. So create a buffer that large auto len_buf = IOBuf::create(4); // Then make those bytes visible. @@ -105,7 +147,7 @@ unique_ptr<IOBuf> ClientSerializer::prepend_length(unique_ptr<IOBuf> msg) { return len_buf; } -unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) { +unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) { // Get the buffer size needed for just the message. int msg_size = msg.ByteSize(); int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; @@ -133,7 +175,7 @@ unique_ptr<IOBuf> ClientSerializer::serialize_delimited(const Message &msg) { return buf; } // TODO(eclark): Make this 1 copy. -unique_ptr<IOBuf> ClientSerializer::serialize_message(const Message &msg) { +unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) { auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); return buf; } diff --git a/hbase-native-client/serde/client-serializer.h b/hbase-native-client/serde/rpc.h similarity index 68% rename from hbase-native-client/serde/client-serializer.h rename to hbase-native-client/serde/rpc.h index 9c819fe..cefb583 100644 --- a/hbase-native-client/serde/client-serializer.h +++ b/hbase-native-client/serde/rpc.h @@ -18,38 +18,41 @@ */ #pragma once -#include <cstdint> -#include <folly/io/IOBuf.h> +#include <memory> #include <string> // Forward +namespace folly { +class IOBuf; +} namespace google { namespace protobuf { class Message; } } -namespace hbase { -class Request; -} namespace hbase { -class ClientSerializer { +class RpcSerde { public: - ClientSerializer(); - std::unique_ptr<folly::IOBuf> preamble(); - std::unique_ptr<folly::IOBuf> header(const std::string &user); - std::unique_ptr<folly::IOBuf> request(const uint32_t call_id, + RpcSerde(); + virtual ~RpcSerde(); + int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg); + std::unique_ptr<folly::IOBuf> Preamble(); + std::unique_ptr<folly::IOBuf> Header(const std::string &user); + std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method, const google::protobuf::Message *msg); std::unique_ptr<folly::IOBuf> - serialize_delimited(const google::protobuf::Message &msg); + SerializeDelimited(const google::protobuf::Message &msg); std::unique_ptr<folly::IOBuf> - serialize_message(const google::protobuf::Message &msg); + SerializeMessage(const google::protobuf::Message &msg); std::unique_ptr<folly::IOBuf> - prepend_length(std::unique_ptr<folly::IOBuf> msg); + PrependLength(std::unique_ptr<folly::IOBuf> msg); +private: + /* data */ uint8_t auth_type_; }; -} // namespace hbase +} diff --git a/hbase-native-client/serde/client-deserializer.h b/hbase-native-client/serde/server-name-test.cc similarity index 75% rename from hbase-native-client/serde/client-deserializer.h rename to hbase-native-client/serde/server-name-test.cc index b9664b0..35dcbc1 100644 --- a/hbase-native-client/serde/client-deserializer.h +++ b/hbase-native-client/serde/server-name-test.cc @@ -16,21 +16,17 @@ * limitations under the License. * */ -#pragma once -#include <folly/io/IOBuf.h> +#include "serde/server-name.h" -// Forward -namespace google { -namespace protobuf { -class Message; -} -} +#include <gtest/gtest.h> +#include <string> -namespace hbase { -class ClientDeserializer { -public: - int parse_delimited(const folly::IOBuf *buf, google::protobuf::Message *msg); -}; +using hbase::pb::ServerName; -} // namespace hbase +TEST(TestServerName, TestMakeServerName) { + auto sn = folly::to<ServerName>("test:123"); + + ASSERT_EQ("test", sn.host_name()); + ASSERT_EQ(123, sn.port()); +} diff --git a/hbase-native-client/serde/server-name.h b/hbase-native-client/serde/server-name.h new file mode 100644 index 0000000..bdba087 --- /dev/null +++ b/hbase-native-client/serde/server-name.h @@ -0,0 +1,21 @@ +#pragma once + +#include "if/HBase.pb.h" +#include <folly/Conv.h> +#include <folly/String.h> + +namespace hbase { +namespace pb { + +template <class String> void parseTo(String in, ServerName &out) { + // TODO see about getting rsplit into folly. + std::string s = folly::to<std::string>(in); + + auto delim = s.rfind(":"); + DCHECK(delim != std::string::npos); + out.set_host_name(s.substr(0, delim)); + // Now keep everything after the : (delim + 1) to the end. + out.set_port(folly::to<int>(s.substr(delim + 1))); +} +} +} diff --git a/hbase-native-client/core/table-name-test.cc b/hbase-native-client/serde/table-name-test.cc similarity index 98% rename from hbase-native-client/core/table-name-test.cc rename to hbase-native-client/serde/table-name-test.cc index 7bad3f1..877d522 100644 --- a/hbase-native-client/core/table-name-test.cc +++ b/hbase-native-client/serde/table-name-test.cc @@ -22,7 +22,7 @@ #include <string> -#include "core/table-name.h" +#include "serde/table-name.h" using namespace hbase; using hbase::pb::TableName; diff --git a/hbase-native-client/core/table-name.h b/hbase-native-client/serde/table-name.h similarity index 80% rename from hbase-native-client/core/table-name.h rename to hbase-native-client/serde/table-name.h index 1612667..c81e166 100644 --- a/hbase-native-client/core/table-name.h +++ b/hbase-native-client/serde/table-name.h @@ -23,6 +23,7 @@ #include "if/HBase.pb.h" #include <folly/Conv.h> +#include <folly/String.h> namespace hbase { namespace pb { @@ -36,15 +37,18 @@ template <class String> void toAppend(const TableName &in, String *result) { } } -} // namespace pb +template <class String> void parseTo(String in, TableName &out) { + std::vector<std::string> v; + folly::split(":", in, v); -class TableNameUtil { -public: - static ::hbase::pb::TableName create(std::string table_name) { - ::hbase::pb::TableName tn; - tn.set_namespace_("default"); - tn.set_qualifier(table_name); - return tn; + if (v.size() == 1) { + out.set_namespace_("default"); + out.set_qualifier(v[0]); + } else { + out.set_namespace_(v[0]); + out.set_qualifier(v[1]); } -}; +} + +} // namespace pb } // namespace hbase diff --git a/hbase-native-client/serde/zk-deserializer-test.cc b/hbase-native-client/serde/zk-deserializer-test.cc index 92d85a0..f07eecf 100644 --- a/hbase-native-client/serde/zk-deserializer-test.cc +++ b/hbase-native-client/serde/zk-deserializer-test.cc @@ -17,7 +17,7 @@ * */ -#include "serde/zk-deserializer.h" +#include "serde/zk.h" #include <folly/Logging.h> #include <folly/io/Cursor.h> @@ -41,7 +41,7 @@ TEST(TestZkDesializer, TestThrowNoMagicNum) { buf->append(100); RWPrivateCursor c{buf.get()}; c.write<uint8_t>(99); - ASSERT_THROW(deser.parse(buf.get(), &mrs), runtime_error); + ASSERT_THROW(deser.Parse(buf.get(), &mrs), runtime_error); } // Test if the protobuf is in a format that we can't decode @@ -78,7 +78,7 @@ TEST(TestZkDesializer, TestBadProtoThrow) { // Create the protobuf MetaRegionServer out; - ASSERT_THROW(deser.parse(buf.get(), &out), runtime_error); + ASSERT_THROW(deser.Parse(buf.get(), &out), runtime_error); } // Test to make sure the whole thing works. @@ -118,6 +118,6 @@ TEST(TestZkDesializer, TestNoThrow) { // Create the protobuf MetaRegionServer out; - ASSERT_TRUE(deser.parse(buf.get(), &out)); + ASSERT_TRUE(deser.Parse(buf.get(), &out)); ASSERT_EQ(mrs.server().host_name(), out.server().host_name()); } diff --git a/hbase-native-client/serde/zk-deserializer.cc b/hbase-native-client/serde/zk.cc similarity index 96% rename from hbase-native-client/serde/zk-deserializer.cc rename to hbase-native-client/serde/zk.cc index 33cf809..59871a5 100644 --- a/hbase-native-client/serde/zk-deserializer.cc +++ b/hbase-native-client/serde/zk.cc @@ -17,7 +17,7 @@ * */ -#include "serde/zk-deserializer.h" +#include "serde/zk.h" #include <folly/io/Cursor.h> #include <folly/io/IOBuf.h> @@ -31,7 +31,7 @@ using google::protobuf::Message; static const std::string MAGIC_STRING = "PBUF"; -bool ZkDeserializer::parse(IOBuf *buf, Message *out) { +bool ZkDeserializer::Parse(IOBuf *buf, Message *out) { // The format is like this // 1 byte of magic number. 255 diff --git a/hbase-native-client/serde/zk-deserializer.h b/hbase-native-client/serde/zk.h similarity index 93% rename from hbase-native-client/serde/zk-deserializer.h rename to hbase-native-client/serde/zk.h index aa91661..b672bf4 100644 --- a/hbase-native-client/serde/zk-deserializer.h +++ b/hbase-native-client/serde/zk.h @@ -30,6 +30,6 @@ class IOBuf; namespace hbase { class ZkDeserializer { public: - bool parse(folly::IOBuf *buf, google::protobuf::Message *out); + bool Parse(folly::IOBuf *buf, google::protobuf::Message *out); }; } // namespace hbase
