This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-14850 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 18eba56d1b37233ea74c4f1a023d2c1410c0db01 Author: Enis Soztutar <[email protected]> AuthorDate: Fri Sep 1 10:55:57 2017 -0700 HBASE-18507 [C++] Support for MultiPuts in AsyncBatchRpcRetryingCaller class (Sudeep Sunthankar) --- hbase-native-client/core/action.h | 9 +- .../core/async-batch-rpc-retrying-caller.cc | 128 ++++++---- .../core/async-batch-rpc-retrying-caller.h | 12 +- .../core/async-batch-rpc-retrying-test.cc | 275 +++++++++++++++------ .../core/async-rpc-retrying-caller-factory.h | 20 +- hbase-native-client/core/client-test.cc | 148 +++++++++++ hbase-native-client/core/raw-async-table.cc | 31 ++- hbase-native-client/core/raw-async-table.h | 7 +- hbase-native-client/core/request-converter.cc | 18 +- hbase-native-client/core/table.cc | 13 + hbase-native-client/core/table.h | 6 +- 11 files changed, 506 insertions(+), 161 deletions(-) diff --git a/hbase-native-client/core/action.h b/hbase-native-client/core/action.h index 21a0181..a00f079 100644 --- a/hbase-native-client/core/action.h +++ b/hbase-native-client/core/action.h @@ -20,22 +20,21 @@ #pragma once #include <memory> -#include "core/get.h" +#include "core/row.h" namespace hbase { - class Action { public: - Action(std::shared_ptr<hbase::Get> action, int32_t original_index) + Action(std::shared_ptr<hbase::Row> action, int32_t original_index) : action_(action), original_index_(original_index) {} ~Action() {} int32_t original_index() const { return original_index_; } - std::shared_ptr<hbase::Get> action() const { return action_; } + std::shared_ptr<hbase::Row> action() const { return action_; } private: - std::shared_ptr<hbase::Get> action_; + std::shared_ptr<hbase::Row> action_; int32_t original_index_; int64_t nonce_ = -1; int32_t replica_id_ = -1; diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc index 0d67b17..dfbf7e7 100644 --- a/hbase-native-client/core/async-batch-rpc-retrying-caller.cc +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.cc @@ -32,11 +32,12 @@ using std::chrono::milliseconds; namespace hbase { -AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller( +template <typename REQ, typename RESP> +AsyncBatchRpcRetryingCaller<REQ, RESP>::AsyncBatchRpcRetryingCaller( std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, - std::shared_ptr<TableName> table_name, const std::vector<hbase::Get> &actions, - nanoseconds pause_ns, int32_t max_attempts, nanoseconds operation_timeout_ns, - nanoseconds rpc_timeout_ns, int32_t start_log_errors_count) + std::shared_ptr<TableName> table_name, const std::vector<REQ> &actions, nanoseconds pause_ns, + int32_t max_attempts, nanoseconds operation_timeout_ns, nanoseconds rpc_timeout_ns, + int32_t start_log_errors_count) : conn_(conn), retry_timer_(retry_timer), table_name_(table_name), @@ -56,29 +57,31 @@ AsyncBatchRpcRetryingCaller::AsyncBatchRpcRetryingCaller( max_attempts_ = ConnectionUtils::Retries2Attempts(max_attempts); uint32_t index = 0; for (auto row : actions) { - actions_.push_back(std::make_shared<Action>(std::make_shared<hbase::Get>(row), index)); - Promise<std::shared_ptr<Result>> prom{}; - action2promises_.insert( - std::pair<uint64_t, Promise<std::shared_ptr<Result>>>(index, std::move(prom))); + actions_.push_back(std::make_shared<Action>(row, index)); + Promise<RESP> prom{}; + action2promises_.insert(std::pair<uint64_t, Promise<RESP>>(index, std::move(prom))); action2futures_.push_back(action2promises_[index++].getFuture()); } } -AsyncBatchRpcRetryingCaller::~AsyncBatchRpcRetryingCaller() {} +template <typename REQ, typename RESP> +AsyncBatchRpcRetryingCaller<REQ, RESP>::~AsyncBatchRpcRetryingCaller() {} -Future<std::vector<Try<std::shared_ptr<Result>>>> AsyncBatchRpcRetryingCaller::Call() { +template <typename REQ, typename RESP> +Future<std::vector<Try<RESP>>> AsyncBatchRpcRetryingCaller<REQ, RESP>::Call() { GroupAndSend(actions_, 1); return collectAll(action2futures_); } -int64_t AsyncBatchRpcRetryingCaller::RemainingTimeNs() { +template <typename REQ, typename RESP> +int64_t AsyncBatchRpcRetryingCaller<REQ, RESP>::RemainingTimeNs() { return operation_timeout_ns_.count() - (TimeUtil::GetNowNanos() - start_ns_); } -void AsyncBatchRpcRetryingCaller::LogException(int32_t tries, - std::shared_ptr<RegionRequest> region_request, - const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException( + int32_t tries, std::shared_ptr<RegionRequest> region_request, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { if (tries > start_log_errors_count_) { std::string regions; regions += region_request->region_location()->region_name() + ", "; @@ -88,7 +91,8 @@ void AsyncBatchRpcRetryingCaller::LogException(int32_t tries, } } -void AsyncBatchRpcRetryingCaller::LogException( +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::LogException( int32_t tries, const std::vector<std::shared_ptr<RegionRequest>> ®ion_requests, const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { if (tries > start_log_errors_count_) { @@ -102,29 +106,35 @@ void AsyncBatchRpcRetryingCaller::LogException( } } -const std::string AsyncBatchRpcRetryingCaller::GetExtraContextForError( +template <typename REQ, typename RESP> +const std::string AsyncBatchRpcRetryingCaller<REQ, RESP>::GetExtraContextForError( std::shared_ptr<ServerName> server_name) { return server_name ? server_name->ShortDebugString() : ""; } -void AsyncBatchRpcRetryingCaller::AddError(const std::shared_ptr<Action> &action, - const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError(const std::shared_ptr<Action> &action, + const folly::exception_wrapper &ew, + std::shared_ptr<ServerName> server_name) { ThrowableWithExtraContext twec(ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); AddAction2Error(action->original_index(), twec); } -void AsyncBatchRpcRetryingCaller::AddError(const std::vector<std::shared_ptr<Action>> &actions, - const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddError( + const std::vector<std::shared_ptr<Action>> &actions, const folly::exception_wrapper &ew, + std::shared_ptr<ServerName> server_name) { for (const auto action : actions) { AddError(action, ew, server_name); } } -void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, int32_t tries, - const folly::exception_wrapper &ew, int64_t current_time, - const std::string extras) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailOne(const std::shared_ptr<Action> &action, + int32_t tries, + const folly::exception_wrapper &ew, + int64_t current_time, + const std::string extras) { auto action_index = action->original_index(); auto itr = action2promises_.find(action_index); if (itr != action2promises_.end()) { @@ -138,16 +148,18 @@ void AsyncBatchRpcRetryingCaller::FailOne(const std::shared_ptr<Action> &action, RetriesExhaustedException(tries - 1, action2errors_[action_index])); } -void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions, - int32_t tries, const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll( + const std::vector<std::shared_ptr<Action>> &actions, int32_t tries, + const folly::exception_wrapper &ew, std::shared_ptr<ServerName> server_name) { for (const auto action : actions) { FailOne(action, tries, ew, TimeUtil::GetNowNanos(), GetExtraContextForError(server_name)); } } -void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Action>> &actions, - int32_t tries) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::FailAll( + const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { for (const auto action : actions) { auto action_index = action->original_index(); auto itr = action2promises_.find(action_index); @@ -159,8 +171,9 @@ void AsyncBatchRpcRetryingCaller::FailAll(const std::vector<std::shared_ptr<Acti } } -void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index, - const ThrowableWithExtraContext &twec) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::AddAction2Error( + uint64_t action_index, const ThrowableWithExtraContext &twec) { auto erritr = action2errors_.find(action_index); if (erritr != action2errors_.end()) { erritr->second->push_back(twec); @@ -171,9 +184,11 @@ void AsyncBatchRpcRetryingCaller::AddAction2Error(uint64_t action_index, return; } -void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_region, int32_t tries, - const folly::exception_wrapper &ew, - std::shared_ptr<ServerName> server_name) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnError(const ActionsByRegion &actions_by_region, + int32_t tries, + const folly::exception_wrapper &ew, + std::shared_ptr<ServerName> server_name) { std::vector<std::shared_ptr<Action>> copied_actions; std::vector<std::shared_ptr<RegionRequest>> region_requests; for (const auto &action_by_region : actions_by_region) { @@ -192,8 +207,9 @@ void AsyncBatchRpcRetryingCaller::OnError(const ActionsByRegion &actions_by_regi TryResubmit(copied_actions, tries); } -void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr<Action>> &actions, - int32_t tries) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::TryResubmit( + const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { int64_t delay_ns; if (operation_timeout_ns_.count() > 0) { int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; @@ -213,9 +229,10 @@ void AsyncBatchRpcRetryingCaller::TryResubmit(const std::vector<std::shared_ptr< }); } +template <typename REQ, typename RESP> Future<std::vector<Try<std::shared_ptr<RegionLocation>>>> -AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_ptr<Action>> &actions, - int64_t locate_timeout_ns) { +AsyncBatchRpcRetryingCaller<REQ, RESP>::GetRegionLocations( + const std::vector<std::shared_ptr<Action>> &actions, int64_t locate_timeout_ns) { auto locs = std::vector<Future<std::shared_ptr<RegionLocation>>>{}; for (auto const &action : actions) { locs.push_back(location_cache_->LocateRegion(*table_name_, action->action()->row(), @@ -225,8 +242,9 @@ AsyncBatchRpcRetryingCaller::GetRegionLocations(const std::vector<std::shared_pt return collectAll(locs); } -void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr<Action>> &actions, - int32_t tries) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::GroupAndSend( + const std::vector<std::shared_ptr<Action>> &actions, int32_t tries) { int64_t locate_timeout_ns; if (operation_timeout_ns_.count() > 0) { locate_timeout_ns = RemainingTimeNs(); @@ -300,8 +318,9 @@ void AsyncBatchRpcRetryingCaller::GroupAndSend(const std::vector<std::shared_ptr return; } -Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller::GetMultiResponse( - const ActionsByServer &actions_by_server) { +template <typename REQ, typename RESP> +Future<std::vector<Try<std::unique_ptr<Response>>>> +AsyncBatchRpcRetryingCaller<REQ, RESP>::GetMultiResponse(const ActionsByServer &actions_by_server) { auto multi_calls = std::vector<Future<std::unique_ptr<hbase::Response>>>{}; auto user = User::defaultUser(); for (const auto &action_by_server : actions_by_server) { @@ -315,7 +334,9 @@ Future<std::vector<Try<std::unique_ptr<Response>>>> AsyncBatchRpcRetryingCaller: return collectAll(multi_calls); } -void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, int32_t tries) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::Send(const ActionsByServer &actions_by_server, + int32_t tries) { int64_t remaining_ns; if (operation_timeout_ns_.count() > 0) { remaining_ns = RemainingTimeNs(); @@ -371,7 +392,8 @@ void AsyncBatchRpcRetryingCaller::Send(const ActionsByServer &actions_by_server, return; } -void AsyncBatchRpcRetryingCaller::OnComplete( +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete( const ActionsByRegion &actions_by_region, int32_t tries, const std::shared_ptr<ServerName> server_name, const std::unique_ptr<hbase::MultiResponse> multi_response) { @@ -418,12 +440,12 @@ void AsyncBatchRpcRetryingCaller::OnComplete( return; } -void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &action, - const std::shared_ptr<RegionRequest> ®ion_request, - int32_t tries, - const std::shared_ptr<ServerName> &server_name, - const std::shared_ptr<RegionResult> ®ion_result, - std::vector<std::shared_ptr<Action>> &failed_actions) { +template <typename REQ, typename RESP> +void AsyncBatchRpcRetryingCaller<REQ, RESP>::OnComplete( + const std::shared_ptr<Action> &action, const std::shared_ptr<RegionRequest> ®ion_request, + int32_t tries, const std::shared_ptr<ServerName> &server_name, + const std::shared_ptr<RegionResult> ®ion_result, + std::vector<std::shared_ptr<Action>> &failed_actions) { std::string err_msg; try { auto result_or_exc = region_result->ResultOrException(action->original_index()); @@ -461,4 +483,6 @@ void AsyncBatchRpcRetryingCaller::OnComplete(const std::shared_ptr<Action> &acti return; } +template class AsyncBatchRpcRetryingCaller<std::shared_ptr<hbase::Row>, + std::shared_ptr<hbase::Result>>; } /* namespace hbase */ diff --git a/hbase-native-client/core/async-batch-rpc-retrying-caller.h b/hbase-native-client/core/async-batch-rpc-retrying-caller.h index 194c439..9194b04 100644 --- a/hbase-native-client/core/async-batch-rpc-retrying-caller.h +++ b/hbase-native-client/core/async-batch-rpc-retrying-caller.h @@ -84,6 +84,7 @@ struct ServerNameHash { } }; +template <typename REQ, typename RESP> class AsyncBatchRpcRetryingCaller { public: using ActionsByServer = @@ -94,15 +95,14 @@ class AsyncBatchRpcRetryingCaller { AsyncBatchRpcRetryingCaller(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer, std::shared_ptr<pb::TableName> table_name, - const std::vector<hbase::Get> &actions, - std::chrono::nanoseconds pause_ns, int32_t max_attempts, - std::chrono::nanoseconds operation_timeout_ns, + const std::vector<REQ> &actions, std::chrono::nanoseconds pause_ns, + int32_t max_attempts, std::chrono::nanoseconds operation_timeout_ns, std::chrono::nanoseconds rpc_timeout_ns, int32_t start_log_errors_count); ~AsyncBatchRpcRetryingCaller(); - folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call(); + folly::Future<std::vector<folly::Try<RESP>>> Call(); private: int64_t RemainingTimeNs(); @@ -172,8 +172,8 @@ class AsyncBatchRpcRetryingCaller { int64_t start_ns_ = TimeUtil::GetNowNanos(); int32_t tries_ = 1; - std::map<uint64_t, folly::Promise<std::shared_ptr<Result>>> action2promises_; - std::vector<folly::Future<std::shared_ptr<Result>>> action2futures_; + std::map<uint64_t, folly::Promise<RESP>> action2promises_; + std::vector<folly::Future<RESP>> action2futures_; std::map<uint64_t, std::shared_ptr<std::vector<ThrowableWithExtraContext>>> action2errors_; std::shared_ptr<AsyncRegionLocator> location_cache_ = nullptr; diff --git a/hbase-native-client/core/async-batch-rpc-retrying-test.cc b/hbase-native-client/core/async-batch-rpc-retrying-test.cc index cad03e1..b8a0b81 100644 --- a/hbase-native-client/core/async-batch-rpc-retrying-test.cc +++ b/hbase-native-client/core/async-batch-rpc-retrying-test.cc @@ -230,7 +230,12 @@ class MockAsyncConnection : public AsyncConnection, return retry_executor_; } - void Close() override {} + void Close() override { + retry_timer_->destroy(); + retry_executor_->stop(); + io_executor_->stop(); + cpu_executor_->stop(); + } std::shared_ptr<HBaseRpcController> CreateRpcController() override { return std::make_shared<HBaseRpcController>(); } @@ -254,15 +259,15 @@ class MockRawAsyncTableImpl { virtual ~MockRawAsyncTableImpl() = default; /* implement this in real RawAsyncTableImpl. */ - folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Gets( - const std::vector<hbase::Get> &gets) { + template <typename REQ, typename RESP> + folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ> &rows) { /* init request caller builder */ - auto builder = conn_->caller_factory()->Batch(); + auto builder = conn_->caller_factory()->Batch<REQ, RESP>(); /* call with retry to get result */ auto async_caller = builder->table(tn_) - ->actions(std::make_shared<std::vector<hbase::Get>>(gets)) + ->actions(std::make_shared<std::vector<REQ>>(rows)) ->rpc_timeout(conn_->connection_conf()->read_rpc_timeout()) ->operation_timeout(conn_->connection_conf()->operation_timeout()) ->pause(conn_->connection_conf()->pause()) @@ -278,9 +283,7 @@ class MockRawAsyncTableImpl { std::shared_ptr<hbase::pb::TableName> tn_; }; -void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, - const std::string &table_name, bool split_regions, uint32_t tries = 3, - uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) { +std::string createTestTable(bool split_regions, const std::string &table_name) { std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", "test500", "test600", "test700", "test800", "test900"}; std::string tableName = (split_regions) ? ("split-" + table_name) : table_name; @@ -289,30 +292,12 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, } else { AsyncBatchRpcRetryTest::test_util->CreateTable(tableName, "d"); } + return tableName; +} - // Create TableName and Row to be fetched from HBase - auto tn = folly::to<hbase::pb::TableName>(tableName); - - // Create a client - Client client(*AsyncBatchRpcRetryTest::test_util->conf()); - - // Get connection to HBase Table - auto table = client.Table(tn); - - for (uint64_t i = 0; i < num_rows; i++) { - table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), - "value" + std::to_string(i))); - } - - std::map<std::string, std::shared_ptr<RegionLocation>> region_locations; - std::vector<hbase::Get> gets; - for (uint64_t i = 0; i < num_rows; ++i) { - auto row = "test" + std::to_string(i); - hbase::Get get(row); - gets.push_back(get); - region_locations[row] = table->GetRegionLocation(row); - } - +std::shared_ptr<MockAsyncConnection> getAsyncConnection( + Client &client, uint32_t operation_timeout_millis, uint32_t tries, + std::shared_ptr<AsyncRegionLocatorBase> region_locator) { /* init region location and rpc channel */ auto cpu_executor_ = std::make_shared<wangle::CPUThreadPoolExecutor>(4); auto io_executor_ = client.async_connection()->io_executor(); @@ -332,35 +317,90 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, tries, // max retries 1); // start log errors count - /* set region locator */ - region_locator->set_region_location(region_locations); - - /* init hbase client connection */ - auto conn = std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, - io_executor_, retry_executor_, rpc_client, - region_locator); - conn->Init(); - - /* init retry caller factory */ - auto tableImpl = - std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); + return std::make_shared<MockAsyncConnection>(connection_conf, retry_timer, cpu_executor_, + io_executor_, retry_executor_, rpc_client, + region_locator); +} - auto tresults = tableImpl->Gets(gets).get(milliseconds(operation_timeout_millis)); +template <typename ACTION> +std::vector<std::shared_ptr<hbase::Row>> getRows(std::vector<ACTION> actions) { + std::vector<std::shared_ptr<hbase::Row>> rows; + for (auto action : actions) { + std::shared_ptr<hbase::Row> srow = std::make_shared<ACTION>(action); + rows.push_back(srow); + } + return rows; +} - ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; +template <typename REQ, typename RESP> +std::vector<std::shared_ptr<hbase::Result>> getResults(std::vector<REQ> &actions, + std::vector<folly::Try<RESP>> &tresults) { std::vector<std::shared_ptr<hbase::Result>> results{}; - uint32_t num = 0; + uint64_t num = 0; for (auto tresult : tresults) { if (tresult.hasValue()) { results.push_back(tresult.value()); } else if (tresult.hasException()) { folly::exception_wrapper ew = tresult.exception(); - LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " << gets[num].row(); + LOG(ERROR) << "Caught exception:- " << ew.what().toStdString() << " for " + << actions[num].row(); throw ew; } ++num; } + return results; +} + +template <typename ACTION> +std::map<std::string, std::shared_ptr<RegionLocation>> getRegionLocationsAndActions( + uint64_t num_rows, std::vector<ACTION> &actions, std::shared_ptr<Table> table) { + std::map<std::string, std::shared_ptr<RegionLocation>> region_locations; + for (uint64_t i = 0; i < num_rows; ++i) { + auto row = "test" + std::to_string(i); + ACTION action(row); + actions.push_back(action); + region_locations[row] = table->GetRegionLocation(row); + } + return region_locations; +} + +void runMultiGets(std::shared_ptr<AsyncRegionLocatorBase> region_locator, + const std::string &table_name, bool split_regions, uint32_t tries = 3, + uint32_t operation_timeout_millis = 600000, uint64_t num_rows = 1000) { + auto tableName = createTestTable(split_regions, table_name); + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(tableName); + + // Create a client + Client client(*AsyncBatchRpcRetryTest::test_util->conf()); + + // Get connection to HBase Table + std::shared_ptr<Table> table = client.Table(tn); + + for (uint64_t i = 0; i < num_rows; i++) { + table->Put(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), + "value" + std::to_string(i))); + } + std::vector<hbase::Get> gets; + auto region_locations = getRegionLocationsAndActions<hbase::Get>(num_rows, gets, table); + + /* set region locator */ + region_locator->set_region_location(region_locations); + /* init hbase client connection */ + auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator); + conn->Init(); + + /* init retry caller factory */ + auto tableImpl = + std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); + + std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Get>(gets); + auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get( + milliseconds(operation_timeout_millis)); + ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; + + auto results = getResults<hbase::Get, std::shared_ptr<Result>>(gets, tresults); // Test the values, should be same as in put executed on hbase shell ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; uint32_t i = 0; @@ -371,101 +411,184 @@ void runMultiTest(std::shared_ptr<AsyncRegionLocatorBase> region_locator, EXPECT_EQ("value" + std::to_string(i), results[i]->Value("d", std::to_string(i)).value()); } - retry_timer->destroy(); table->Close(); client.Close(); - retry_executor_->stop(); - io_executor_->stop(); - cpu_executor_->stop(); + conn->Close(); +} + +void runMultiPuts(std::shared_ptr<AsyncRegionLocatorBase> region_locator, + const std::string &table_name, bool split_regions, uint32_t tries = 3, + uint32_t operation_timeout_millis = 600000, uint32_t num_rows = 1000) { + auto tableName = createTestTable(split_regions, table_name); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(tableName); + + // Create a client + Client client(*AsyncBatchRpcRetryTest::test_util->conf()); + + // Get connection to HBase Table + std::shared_ptr<Table> table = client.Table(tn); + + std::vector<hbase::Put> puts; + auto region_locations = getRegionLocationsAndActions<hbase::Put>(num_rows, puts, table); + + /* set region locator */ + region_locator->set_region_location(region_locations); + + /* init hbase client connection */ + auto conn = getAsyncConnection(client, operation_timeout_millis, tries, region_locator); + conn->Init(); + + /* init retry caller factory */ + auto tableImpl = + std::make_shared<MockRawAsyncTableImpl>(conn, std::make_shared<hbase::pb::TableName>(tn)); + + std::vector<std::shared_ptr<hbase::Row>> rows = getRows<hbase::Put>(puts); + auto tresults = tableImpl->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>(rows).get( + milliseconds(operation_timeout_millis)); + ASSERT_TRUE(!tresults.empty()) << "tresults shouldn't be empty."; + + auto results = getResults<hbase::Put, std::shared_ptr<Result>>(puts, tresults); + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!results.empty()) << "Results shouldn't be empty."; + + table->Close(); + client.Close(); + conn->Close(); } // Test successful case TEST_F(AsyncBatchRpcRetryTest, MultiGets) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockAsyncRegionLocator>()); - runMultiTest(region_locator, "table1", false); + runMultiGets(region_locator, "table1", false); } // Tests the RPC failing 3 times, then succeeding TEST_F(AsyncBatchRpcRetryTest, HandleException) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); - runMultiTest(region_locator, "table2", false, 5); + runMultiGets(region_locator, "table2", false, 5); } // Tests the RPC failing 4 times, throwing an exception TEST_F(AsyncBatchRpcRetryTest, FailWithException) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiTest(region_locator, "table3", false)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table3", false)); } // Tests the region location lookup failing 3 times, then succeeding TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookup) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockFailingAsyncRegionLocator>(3)); - runMultiTest(region_locator, "table4", false); + runMultiGets(region_locator, "table4", false); } // Tests the region location lookup failing 5 times, throwing an exception TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookup) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockFailingAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiTest(region_locator, "table5", false, 3)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table5", false, 3)); } // Tests hitting operation timeout, thus not retrying anymore TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeout) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockFailingAsyncRegionLocator>(6)); - EXPECT_ANY_THROW(runMultiTest(region_locator, "table6", false, 5, 100, 1000)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table6", false, 5, 100, 1000)); } -/* - TODO: Below tests are failing with frequently with segfaults coming from - JNI internals indicating that we are doing something wrong in the JNI boundary. - However, we were not able to debug furhter yet. Disable the tests for now, and - come back later to fix the issue. - +////////////////////// // Test successful case -TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) { +TEST_F(AsyncBatchRpcRetryTest, MultiPuts) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockAsyncRegionLocator>()); - runMultiTest(region_locator, "table7", true); + runMultiPuts(region_locator, "table1", false); } // Tests the RPC failing 3 times, then succeeding -TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) { +TEST_F(AsyncBatchRpcRetryTest, PutsHandleException) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); - runMultiTest(region_locator, "table8", true, 5); + runMultiPuts(region_locator, "table2", false, 5); } // Tests the RPC failing 4 times, throwing an exception -TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) { +TEST_F(AsyncBatchRpcRetryTest, PutsFailWithException) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiTest(region_locator, "table9", true)); + EXPECT_ANY_THROW(runMultiPuts(region_locator, "table3", false)); } // Tests the region location lookup failing 3 times, then succeeding -TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) { +TEST_F(AsyncBatchRpcRetryTest, PutsHandleExceptionFromRegionLocationLookup) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockFailingAsyncRegionLocator>(3)); - runMultiTest(region_locator, "table10", true); + runMultiPuts(region_locator, "table4", false); } // Tests the region location lookup failing 5 times, throwing an exception -TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) { +TEST_F(AsyncBatchRpcRetryTest, PutsFailWithExceptionFromRegionLocationLookup) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockFailingAsyncRegionLocator>(4)); - EXPECT_ANY_THROW(runMultiTest(region_locator, "table11", true, 3)); + EXPECT_ANY_THROW(runMultiPuts(region_locator, "table5", false, 3)); } // Tests hitting operation timeout, thus not retrying anymore -TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) { +TEST_F(AsyncBatchRpcRetryTest, PutsFailWithOperationTimeout) { std::shared_ptr<AsyncRegionLocatorBase> region_locator( std::make_shared<MockFailingAsyncRegionLocator>(6)); - EXPECT_ANY_THROW(runMultiTest(region_locator, "table12", true, 5, 100, 1000)); + EXPECT_ANY_THROW(runMultiPuts(region_locator, "table6", false, 5, 100, 1000)); } -*/ + +////////////////////// +/* + TODO: Below tests are failing with frequently with segfaults coming from + JNI internals indicating that we are doing something wrong in the JNI boundary. + However, we were not able to debug furhter yet. Disable the tests for now, and + come back later to fix the issue. + + // Test successful case + TEST_F(AsyncBatchRpcRetryTest, MultiGetsSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockAsyncRegionLocator>()); + runMultiGets(region_locator, "table7", true); + } + + // Tests the RPC failing 3 times, then succeeding + TEST_F(AsyncBatchRpcRetryTest, HandleExceptionSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(3)); + runMultiGets(region_locator, "table8", true, 5); + } + + // Tests the RPC failing 4 times, throwing an exception + TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockWrongRegionAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table9", true)); + } + + // Tests the region location lookup failing 3 times, then succeeding + TEST_F(AsyncBatchRpcRetryTest, HandleExceptionFromRegionLocationLookupSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(3)); + runMultiGets(region_locator, "table10", true); + } + + // Tests the region location lookup failing 5 times, throwing an exception + TEST_F(AsyncBatchRpcRetryTest, FailWithExceptionFromRegionLocationLookupSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(4)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table11", true, 3)); + } + + // Tests hitting operation timeout, thus not retrying anymore + TEST_F(AsyncBatchRpcRetryTest, FailWithOperationTimeoutSplitRegions) { + std::shared_ptr<AsyncRegionLocatorBase> region_locator( + std::make_shared<MockFailingAsyncRegionLocator>(6)); + EXPECT_ANY_THROW(runMultiGets(region_locator, "table12", true, 5, 100, 1000)); + } + */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h index 1af6e72..188f469 100644 --- a/hbase-native-client/core/async-rpc-retrying-caller-factory.h +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -135,7 +135,8 @@ class SingleRequestCallerBuilder Callable<RESP> callable_; }; // end of SingleRequestCallerBuilder -class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder> { +template <typename REQ, typename RESP> +class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilder<REQ, RESP>> { public: explicit BatchCallerBuilder(std::shared_ptr<AsyncConnection> conn, std::shared_ptr<folly::HHWheelTimer> retry_timer) @@ -143,14 +144,14 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde virtual ~BatchCallerBuilder() = default; - typedef std::shared_ptr<BatchCallerBuilder> SharedThisPtr; + typedef std::shared_ptr<BatchCallerBuilder<REQ, RESP>> SharedThisPtr; SharedThisPtr table(std::shared_ptr<pb::TableName> table_name) { table_name_ = table_name; return shared_this(); } - SharedThisPtr actions(std::shared_ptr<std::vector<hbase::Get>> actions) { + SharedThisPtr actions(std::shared_ptr<std::vector<REQ>> actions) { actions_ = actions; return shared_this(); } @@ -180,10 +181,10 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde return shared_this(); } - folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Call() { return Build()->Call(); } + folly::Future<std::vector<folly::Try<RESP>>> Call() { return Build()->Call(); } - std::shared_ptr<AsyncBatchRpcRetryingCaller> Build() { - return std::make_shared<AsyncBatchRpcRetryingCaller>( + std::shared_ptr<AsyncBatchRpcRetryingCaller<REQ, RESP>> Build() { + return std::make_shared<AsyncBatchRpcRetryingCaller<REQ, RESP>>( conn_, retry_timer_, table_name_, *actions_, pause_ns_, max_attempts_, operation_timeout_nanos_, rpc_timeout_nanos_, start_log_errors_count_); } @@ -197,7 +198,7 @@ class BatchCallerBuilder : public std::enable_shared_from_this<BatchCallerBuilde std::shared_ptr<AsyncConnection> conn_; std::shared_ptr<folly::HHWheelTimer> retry_timer_; std::shared_ptr<hbase::pb::TableName> table_name_ = nullptr; - std::shared_ptr<std::vector<hbase::Get>> actions_ = nullptr; + std::shared_ptr<std::vector<REQ>> actions_ = nullptr; std::chrono::nanoseconds pause_ns_; int32_t max_attempts_ = 0; std::chrono::nanoseconds operation_timeout_nanos_; @@ -329,8 +330,9 @@ class AsyncRpcRetryingCallerFactory { return std::make_shared<SingleRequestCallerBuilder<RESP>>(conn_, retry_timer_); } - std::shared_ptr<BatchCallerBuilder> Batch() { - return std::make_shared<BatchCallerBuilder>(conn_, retry_timer_); + template <typename REQ, typename RESP> + std::shared_ptr<BatchCallerBuilder<REQ, RESP>> Batch() { + return std::make_shared<BatchCallerBuilder<REQ, RESP>>(conn_, retry_timer_); } std::shared_ptr<ScanCallerBuilder> Scan() { diff --git a/hbase-native-client/core/client-test.cc b/hbase-native-client/core/client-test.cc index 1c9b709..3f72880 100644 --- a/hbase-native-client/core/client-test.cc +++ b/hbase-native-client/core/client-test.cc @@ -547,3 +547,151 @@ TEST_F(ClientTest, MultiGetsWithRegionSplits) { table->Close(); client.Close(); } + +void PerformMultiPuts(uint64_t num_rows, std::shared_ptr<hbase::Client> client, + const std::string &table_name) { + auto tn = folly::to<hbase::pb::TableName>(table_name); + auto table = client->Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + std::vector<hbase::Put> puts; + // Perform Puts + for (uint64_t i = 0; i < num_rows; i++) { + puts.push_back(Put{"test" + std::to_string(i)}.AddColumn("d", std::to_string(i), + "value" + std::to_string(i))); + } + table->Put(puts); +} + +void PerformMultiPuts(std::vector<hbase::Put> &puts, std::shared_ptr<Table> table) { + table->Put(puts); +} + +TEST_F(ClientTest, MultiGetsWithMultiPuts) { + std::string table_name = "t"; + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable(table_name, "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(table_name); + + SetClientParams(); + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + uint64_t num_rows = 50000; + PerformMultiPuts(num_rows, std::make_shared<hbase::Client>(client), table_name); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + std::vector<hbase::Get> gets; + MakeGets(num_rows, "test", gets); + + auto results = table->Get(gets); + + TestMultiResults(num_rows, results, gets); + + table->Close(); + client.Close(); +} + +TEST_F(ClientTest, MultiGetsWithMultiPutsAndSplitRegions) { + // Using TestUtil to populate test data + std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", + "test500", "test600", "test700", "test800", "test900"}; + std::string table_name = "t"; + ClientTest::test_util->CreateTable(table_name, "d", keys); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(table_name); + + SetClientParams(); + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + + uint64_t num_rows = 50000; + PerformMultiPuts(num_rows, std::make_shared<hbase::Client>(client), table_name); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + std::vector<hbase::Get> gets; + MakeGets(num_rows, "test", gets); + + auto results = table->Get(gets); + + TestMultiResults(num_rows, results, gets); + + table->Close(); + client.Close(); +} + +TEST_F(ClientTest, MultiPuts) { + std::string table_name = "t"; + // Using TestUtil to populate test data + ClientTest::test_util->CreateTable(table_name, "d"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(table_name); + + SetClientParams(); + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + std::shared_ptr<Table> table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + uint64_t num_rows = 80000; + uint64_t batch_num_rows = 10000; + std::vector<hbase::Put> puts; + for (uint64_t i = 0; i < num_rows;) { + puts.clear(); + // accumulate batch_num_rows at a time + for (uint64_t j = 0; j < batch_num_rows && i < num_rows; ++j) { + hbase::Put put("test" + std::to_string(i)); + put.AddColumn("d", std::to_string(i), "value" + std::to_string(i)); + puts.push_back(put); + i++; + } + PerformMultiPuts(puts, table); + } + table->Close(); + client.Close(); +} + +TEST_F(ClientTest, MultiPutsWithRegionSplits) { + // Using TestUtil to populate test data + std::vector<std::string> keys{"test0", "test100", "test200", "test300", "test400", + "test500", "test600", "test700", "test800", "test900"}; + std::string table_name = "t"; + ClientTest::test_util->CreateTable(table_name, "d", keys); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>(table_name); + + SetClientParams(); + + // Create a client + hbase::Client client(*ClientTest::test_util->conf()); + std::shared_ptr<Table> table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + uint64_t num_rows = 80000; + uint64_t batch_num_rows = 10000; + std::vector<hbase::Put> puts; + for (uint64_t i = 0; i < num_rows;) { + puts.clear(); + // accumulate batch_num_rows at a time + for (uint64_t j = 0; j < batch_num_rows && i < num_rows; ++j) { + hbase::Put put("test" + std::to_string(i)); + put.AddColumn("d", std::to_string(i), "value" + std::to_string(i)); + puts.push_back(put); + i++; + } + PerformMultiPuts(puts, table); + } + table->Close(); + client.Close(); +} diff --git a/hbase-native-client/core/raw-async-table.cc b/hbase-native-client/core/raw-async-table.cc index 53ab526..409883f 100644 --- a/hbase-native-client/core/raw-async-table.cc +++ b/hbase-native-client/core/raw-async-table.cc @@ -197,18 +197,26 @@ folly::Future<std::shared_ptr<Result>> RawAsyncTable::Append(const hbase::Append return caller->Call().then([caller](const auto r) { return r; }); } + folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Get( const std::vector<hbase::Get>& gets) { - return this->Batch(gets); + std::vector<std::shared_ptr<hbase::Row>> rows; + for (auto get : gets) { + std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Get>(get); + rows.push_back(srow); + } + return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>( + rows, connection_conf_->read_rpc_timeout()); } -folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Batch( - const std::vector<hbase::Get>& gets) { +template <typename REQ, typename RESP> +folly::Future<std::vector<folly::Try<RESP>>> RawAsyncTable::Batch( + const std::vector<REQ>& rows, std::chrono::nanoseconds timeout) { auto caller = connection_->caller_factory() - ->Batch() + ->Batch<REQ, RESP>() ->table(table_name_) - ->actions(std::make_shared<std::vector<hbase::Get>>(gets)) - ->rpc_timeout(connection_conf_->read_rpc_timeout()) + ->actions(std::make_shared<std::vector<REQ>>(rows)) + ->rpc_timeout(timeout) ->operation_timeout(connection_conf_->operation_timeout()) ->pause(connection_conf_->pause()) ->max_attempts(connection_conf_->max_retries()) @@ -237,4 +245,15 @@ std::shared_ptr<hbase::Scan> RawAsyncTable::SetDefaultScanConfig(const hbase::Sc } return new_scan; } + +folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> RawAsyncTable::Put( + const std::vector<hbase::Put>& puts) { + std::vector<std::shared_ptr<hbase::Row>> rows; + for (auto put : puts) { + std::shared_ptr<hbase::Row> srow = std::make_shared<hbase::Put>(put); + rows.push_back(srow); + } + return this->Batch<std::shared_ptr<hbase::Row>, std::shared_ptr<Result>>( + rows, connection_conf_->write_rpc_timeout()); +} } // namespace hbase diff --git a/hbase-native-client/core/raw-async-table.h b/hbase-native-client/core/raw-async-table.h index e651f8a..97eef7f 100644 --- a/hbase-native-client/core/raw-async-table.h +++ b/hbase-native-client/core/raw-async-table.h @@ -83,8 +83,11 @@ class RawAsyncTable { folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Get( const std::vector<hbase::Get>& gets); - folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Batch( - const std::vector<hbase::Get>& gets); + template <typename REQ, typename RESP> + folly::Future<std::vector<folly::Try<RESP>>> Batch(const std::vector<REQ>& rows, + std::chrono::nanoseconds timeout); + folly::Future<std::vector<folly::Try<std::shared_ptr<Result>>>> Put( + const std::vector<hbase::Put>& puts); private: /* Data */ diff --git a/hbase-native-client/core/request-converter.cc b/hbase-native-client/core/request-converter.cc index 47c09d1..f48f228 100644 --- a/hbase-native-client/core/request-converter.cc +++ b/hbase-native-client/core/request-converter.cc @@ -173,14 +173,23 @@ std::unique_ptr<Request> RequestConverter::ToMultiRequest( auto pb_action = pb_region_action->add_action(); auto pget = region_action->action(); // We store only hbase::Get in hbase::Action as of now. It will be changed later on. - CHECK(pget) << "Unexpected. action can't be null"; - auto pb_get = RequestConverter::ToGet(*pget); - pb_action->set_allocated_get(pb_get.release()); + CHECK(pget) << "Unexpected. action can't be null."; + std::string error_msg(""); + if (typeid(*pget) == typeid(hbase::Get)) { + auto getp = dynamic_cast<hbase::Get *>(pget.get()); + pb_action->set_allocated_get(RequestConverter::ToGet(*getp).release()); + } else if (typeid(*pget) == typeid(hbase::Put)) { + auto putp = dynamic_cast<hbase::Put *>(pget.get()); + pb_action->set_allocated_mutation( + RequestConverter::ToMutation(MutationType::MutationProto_MutationType_PUT, *putp, -1) + .release()); + } else { + throw std::runtime_error("Unexpected action type encountered."); + } pb_action->set_index(action_num); action_num++; } } - return pb_req; } @@ -355,4 +364,5 @@ std::unique_ptr<Request> RequestConverter::AppendToMutateRequest(const Append &a VLOG(3) << "Req is " << pb_req->req_msg()->ShortDebugString(); return pb_req; } + } /* namespace hbase */ diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 3b7a87b..f93a029 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -128,4 +128,17 @@ std::vector<std::shared_ptr<hbase::Result>> Table::Get(const std::vector<hbase:: return results; } +void Table::Put(const std::vector<hbase::Put> &puts) { + auto tresults = async_table_->Put(puts).get(operation_timeout()); + uint32_t num = 0; + for (auto tresult : tresults) { + if (tresult.hasException()) { + LOG(ERROR) << "Caught exception:- " << tresult.exception().what() << " for " + << puts[num++].row(); + throw tresult.exception(); + } + } + return; +} + } /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index cc37182..6340494 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -119,11 +119,15 @@ class Table { * @param - append Append object to perform HBase Append operation. */ std::shared_ptr<hbase::Result> Append(const hbase::Append &append); - // TODO: Batch Puts std::shared_ptr<ResultScanner> Scan(const hbase::Scan &scan); /** + * @brief - Multi Puts. + * @param - puts vector of hbase::Put. + */ + void Put(const std::vector<hbase::Put> &puts); + /** * @brief - Close the client connection. */ void Close();
