Repository: hbase Updated Branches: refs/heads/HBASE-14850 239a8fc3e -> 915d89f51
HBASE-17727 [C++] Make RespConverter work with RawAsyncTableImpl Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/915d89f5 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/915d89f5 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/915d89f5 Branch: refs/heads/HBASE-14850 Commit: 915d89f51ab2069a136638b605f5296ea097e823 Parents: 239a8fc Author: Enis Soztutar <e...@apache.org> Authored: Tue Mar 28 13:52:31 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Tue Mar 28 13:52:31 2017 -0700 ---------------------------------------------------------------------- .../core/async-rpc-retrying-test.cc | 62 +++++++++----------- hbase-native-client/core/raw-async-table.cc | 11 ++-- hbase-native-client/core/raw-async-table.h | 8 +-- hbase-native-client/core/response-converter.cc | 4 -- 4 files changed, 38 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/915d89f5/hbase-native-client/core/async-rpc-retrying-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc index d0c7921..5086286 100644 --- a/hbase-native-client/core/async-rpc-retrying-test.cc +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -137,15 +137,15 @@ class MockRawAsyncTableImpl { /* in real RawAsyncTableImpl, this should be private. */ template <typename REQ, typename PREQ, typename PRESP, typename RESP> - folly::Future<RESP> Call( - std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, - std::shared_ptr<RegionLocation> loc, const REQ& req, - const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter, - const hbase::RpcCall<PREQ, PRESP>& rpc_call, - const RespConverter<RESP, PRESP>& resp_converter) { + folly::Future<RESP> Call(std::shared_ptr<hbase::RpcClient> rpc_client, + std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const REQ& req, + ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, + const hbase::RpcCall<PREQ, PRESP>& rpc_call, + RespConverter<RESP, PRESP> resp_converter) { rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) - .then([&, this](std::unique_ptr<PRESP> presp) { - std::shared_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp); + .then([&, this, resp_converter](std::unique_ptr<PRESP> presp) { + RESP result = resp_converter(*presp); promise_->setValue(result); }) .onError([this](const std::exception& e) { promise_->setException(e); }); @@ -210,31 +210,27 @@ TEST(AsyncRpcRetryTest, TestGetBasic) { auto builder = conn->caller_factory()->Single<std::shared_ptr<hbase::Result>>(); /* call with retry to get result */ - try { - auto async_caller = - builder->table(std::make_shared<TableName>(tn)) - ->row(row) - ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) - ->operation_timeout(conn->connection_conf()->operation_timeout()) - ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, - std::shared_ptr<hbase::RegionLocation> loc, - std::shared_ptr<hbase::RpcClient> rpc_client) - -> folly::Future<std::shared_ptr<hbase::Result>> { - return tableImpl->GetCall(rpc_client, controller, loc, get); - }) - ->Build(); - - auto result = async_caller->Call().get(); - - // Test the values, should be same as in put executed on hbase shell - ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; - EXPECT_EQ("test2", result->Row()); - EXPECT_EQ("value2", *(result->Value("d", "2"))); - EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); - } catch (std::exception& e) { - LOG(ERROR) << e.what(); - throw e; - } + + auto async_caller = + builder->table(std::make_shared<TableName>(tn)) + ->row(row) + ->rpc_timeout(conn->connection_conf()->read_rpc_timeout()) + ->operation_timeout(conn->connection_conf()->operation_timeout()) + ->action([=, &get](std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) + -> folly::Future<std::shared_ptr<hbase::Result>> { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) + ->Build(); + + auto result = async_caller->Call().get(); + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result->IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result->Row()); + EXPECT_EQ("value2", *(result->Value("d", "2"))); + EXPECT_EQ("value for extra", *(result->Value("d", "extra"))); table->Close(); client.Close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/915d89f5/hbase-native-client/core/raw-async-table.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 641f3c8..88a3382 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -16,6 +16,7 @@ * limitations under the License. * */ +#include <utility> #include "core/raw-async-table.h" #include "core/request-converter.h" @@ -41,18 +42,16 @@ template <typename REQ, typename PREQ, typename PRESP, typename RESP> folly::Future<RESP> RawAsyncTable::Call( std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, std::shared_ptr<RegionLocation> loc, const REQ& req, - const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter, - const RespConverter<RESP, PRESP>& resp_converter) { + const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, + const RespConverter<RESP, PRESP> resp_converter) { std::unique_ptr<PREQ> preq = req_converter(req, loc->region_name()); // No need to make take a callable argument, it is always the same return rpc_client ->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), std::move(preq), User::defaultUser(), "ClientService") - .then([&](const std::unique_ptr<Response>& presp) { - return ResponseConverter::FromGetResponse(*presp); - // return resp_converter(*presp); // TODO this is causing SEGFAULT, figure out why - }); + .then( + [resp_converter](const std::unique_ptr<PRESP>& presp) { return resp_converter(*presp); }); } Future<std::shared_ptr<Result>> RawAsyncTable::Get(const hbase::Get& get) { http://git-wip-us.apache.org/repos/asf/hbase/blob/915d89f5/hbase-native-client/core/raw-async-table.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index 527c7be..bbdc6bd 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -18,12 +18,12 @@ */ #pragma once +#include <folly/futures/Future.h> + #include <chrono> #include <memory> #include <string> -#include <folly/futures/Future.h> - #include "core/async-connection.h" #include "core/async-rpc-retrying-caller-factory.h" #include "core/async-rpc-retrying-caller.h" @@ -66,8 +66,8 @@ class RawAsyncTable { folly::Future<RESP> Call( std::shared_ptr<RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, std::shared_ptr<RegionLocation> loc, const REQ& req, - const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter, - const RespConverter<RESP, PRESP>& resp_converter); + const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string> req_converter, + const RespConverter<RESP, PRESP> resp_converter); template <typename RESP> std::shared_ptr<SingleRequestCallerBuilder<RESP>> CreateCallerBuilder(std::string row, http://git-wip-us.apache.org/repos/asf/hbase/blob/915d89f5/hbase-native-client/core/response-converter.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/core/response-converter.cc b/hbase-native-client/core/response-converter.cc index b11856c..b2fff34 100644 --- a/hbase-native-client/core/response-converter.cc +++ b/hbase-native-client/core/response-converter.cc @@ -36,14 +36,12 @@ ResponseConverter::~ResponseConverter() {} // impl note: we are returning shared_ptr's instead of unique_ptr's because these // go inside folly::Future's, making the move semantics extremely tricky. std::shared_ptr<Result> ResponseConverter::FromGetResponse(const Response& resp) { - LOG(INFO) << "FromGetResponse"; auto get_resp = std::static_pointer_cast<GetResponse>(resp.resp_msg()); return ToResult(get_resp->result(), resp.cell_scanner()); } std::shared_ptr<Result> ResponseConverter::ToResult( const hbase::pb::Result& result, const std::unique_ptr<CellScanner>& cell_scanner) { - LOG(INFO) << "ToResult"; std::vector<std::shared_ptr<Cell>> vcells; for (auto cell : result.cell()) { std::shared_ptr<Cell> pcell = @@ -59,13 +57,11 @@ std::shared_ptr<Result> ResponseConverter::ToResult( } // TODO: check associated cell count? } - LOG(INFO) << "Returning Result"; return std::make_shared<Result>(vcells, result.exists(), result.stale(), result.partial()); } std::vector<std::shared_ptr<Result>> ResponseConverter::FromScanResponse(const Response& resp) { auto scan_resp = std::static_pointer_cast<ScanResponse>(resp.resp_msg()); - LOG(INFO) << "FromScanResponse:" << scan_resp->ShortDebugString(); int num_results = resp.cell_scanner() != nullptr ? scan_resp->cells_per_result_size() : scan_resp->results_size();