This is an automated email from the ASF dual-hosted git repository. zghao pushed a commit to branch HBASE-14850 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 1eb84c2f9049d82c5fd1ffe970ad37249b5cb22b Author: Enis Soztutar <[email protected]> AuthorDate: Fri Mar 3 19:15:16 2017 -0800 HBASE-17465 [C++] implement request retry mechanism over RPC (Xiaobing Zhou) --- hbase-native-client/Makefile | 2 +- hbase-native-client/bin/start-docker.sh | 2 +- .../connection/connection-factory.cc | 7 +- .../connection/connection-factory.h | 3 +- hbase-native-client/connection/connection-pool.cc | 4 +- hbase-native-client/connection/rpc-client.cc | 42 +--- hbase-native-client/connection/rpc-client.h | 42 +--- hbase-native-client/core/BUCK | 14 ++ .../core/async-rpc-retrying-caller-factory.cc | 22 ++ .../core/async-rpc-retrying-caller-factory.h | 124 ++++++++++ .../core/async-rpc-retrying-caller.cc | 22 ++ .../core/async-rpc-retrying-caller.h | 266 +++++++++++++++++++++ .../core/async-rpc-retrying-test.cc | 255 ++++++++++++++++++++ hbase-native-client/core/client.cc | 3 +- hbase-native-client/core/client.h | 2 +- hbase-native-client/core/filter.h | 2 +- hbase-native-client/core/hbase-rpc-controller.cc | 22 ++ hbase-native-client/core/hbase-rpc-controller.h | 56 +++++ hbase-native-client/core/location-cache.cc | 1 + hbase-native-client/core/region-location.h | 10 +- hbase-native-client/core/response_converter.cc | 1 + hbase-native-client/core/response_converter.h | 1 + hbase-native-client/core/table.cc | 4 + hbase-native-client/core/table.h | 5 + hbase-native-client/{utils => exceptions}/BUCK | 17 +- hbase-native-client/exceptions/exception.h | 104 ++++++++ hbase-native-client/utils/BUCK | 7 +- hbase-native-client/utils/connection-util.cc | 26 ++ hbase-native-client/utils/connection-util.h | 62 +++++ hbase-native-client/utils/sys-util.h | 39 +++ hbase-native-client/utils/time-util.h | 52 ++++ 31 files changed, 1113 insertions(+), 106 deletions(-) diff --git a/hbase-native-client/Makefile b/hbase-native-client/Makefile index 84ae556..b926220 100644 --- a/hbase-native-client/Makefile +++ b/hbase-native-client/Makefile @@ -22,7 +22,7 @@ LD:=g++ DEBUG_PATH = build/debug RELEASE_PATH = build/release PROTO_SRC_DIR = build/if -MODULES = connection core serde test-util utils security +MODULES = connection core serde test-util utils security exceptions SRC_DIR = $(MODULES) DEBUG_BUILD_DIR = $(addprefix $(DEBUG_PATH)/,$(MODULES)) RELEASE_BUILD_DIR = $(addprefix $(RELEASE_PATH)/,$(MODULES)) diff --git a/hbase-native-client/bin/start-docker.sh b/hbase-native-client/bin/start-docker.sh index 1380cdf..8b017a0 100755 --- a/hbase-native-client/bin/start-docker.sh +++ b/hbase-native-client/bin/start-docker.sh @@ -56,7 +56,7 @@ docker build -t hbase_native . # After the image is built run the thing docker run -p 16050:16050/tcp \ - -v ${BASE_DIR}/..:/usr/src/hbase \ + -v ${BASE_DIR}/..:/usr/src/hbase \ -v ~/.m2:/root/.m2 \ -it hbase_native /bin/bash popd diff --git a/hbase-native-client/connection/connection-factory.cc b/hbase-native-client/connection/connection-factory.cc index 2f7e75c..832b00f 100644 --- a/hbase-native-client/connection/connection-factory.cc +++ b/hbase-native-client/connection/connection-factory.cc @@ -31,11 +31,10 @@ using std::chrono::milliseconds; using std::chrono::nanoseconds; ConnectionFactory::ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool, - std::shared_ptr<Codec> codec, - nanoseconds connect_timeout) + std::shared_ptr<Codec> codec, nanoseconds connect_timeout) : connect_timeout_(connect_timeout), - io_pool_(io_pool), - pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {} + io_pool_(io_pool), + pipeline_factory_(std::make_shared<RpcPipelineFactory>(codec)) {} std::shared_ptr<wangle::ClientBootstrap<SerializePipeline>> ConnectionFactory::MakeBootstrap() { auto client = std::make_shared<wangle::ClientBootstrap<SerializePipeline>>(); diff --git a/hbase-native-client/connection/connection-factory.h b/hbase-native-client/connection/connection-factory.h index fbcb6ef..32d0bf7 100644 --- a/hbase-native-client/connection/connection-factory.h +++ b/hbase-native-client/connection/connection-factory.h @@ -44,8 +44,7 @@ class ConnectionFactory { * There should only be one ConnectionFactory per client. */ ConnectionFactory(std::shared_ptr<wangle::IOThreadPoolExecutor> io_pool, - std::shared_ptr<Codec> codec, - nanoseconds connect_timeout = nanoseconds(0)); + std::shared_ptr<Codec> codec, nanoseconds connect_timeout = nanoseconds(0)); /** Default Destructor */ virtual ~ConnectionFactory() = default; diff --git a/hbase-native-client/connection/connection-pool.cc b/hbase-native-client/connection/connection-pool.cc index b18ee89..4fe4610 100644 --- a/hbase-native-client/connection/connection-pool.cc +++ b/hbase-native-client/connection/connection-pool.cc @@ -22,6 +22,7 @@ #include <folly/SocketAddress.h> #include <wangle/service/Service.h> +#include <folly/Logging.h> #include <memory> #include <utility> @@ -34,8 +35,7 @@ using folly::SharedMutexWritePriority; using folly::SocketAddress; ConnectionPool::ConnectionPool(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, - std::shared_ptr<Codec> codec, - nanoseconds connect_timeout) + std::shared_ptr<Codec> codec, nanoseconds connect_timeout) : cf_(std::make_shared<ConnectionFactory>(io_executor, codec, connect_timeout)), clients_(), connections_(), diff --git a/hbase-native-client/connection/rpc-client.cc b/hbase-native-client/connection/rpc-client.cc index c61a73e..5fa1138 100644 --- a/hbase-native-client/connection/rpc-client.cc +++ b/hbase-native-client/connection/rpc-client.cc @@ -18,27 +18,17 @@ */ #include "connection/rpc-client.h" + +#include <folly/Logging.h> #include <unistd.h> #include <wangle/concurrent/IOThreadPoolExecutor.h> +#include <memory> +#include <string> using hbase::RpcClient; -using hbase::AbstractRpcChannel; namespace hbase { -class RpcChannelImplementation : public AbstractRpcChannel { - public: - RpcChannelImplementation(std::shared_ptr<RpcClient> rpc_client, const std::string& host, - uint16_t port, std::shared_ptr<User> ticket, int rpc_timeout) - : AbstractRpcChannel(rpc_client, host, port, ticket, rpc_timeout) {} - - void CallMethod(const MethodDescriptor* method, RpcController* controller, const Message* request, - Message* response, Closure* done) override { - rpc_client_->CallMethod(method, controller, request, response, done, host_, port_, ticket_); - } -}; -} // namespace hbase - RpcClient::RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, std::shared_ptr<Codec> codec, nanoseconds connect_timeout) : io_executor_(io_executor) { @@ -80,26 +70,4 @@ folly::Future<std::unique_ptr<Response>> RpcClient::AsyncCall(const std::string& std::shared_ptr<RpcConnection> RpcClient::GetConnection(std::shared_ptr<ConnectionId> remote_id) { return cp_->GetConnection(remote_id); } - -std::shared_ptr<RpcChannel> RpcClient::CreateRpcChannel(const std::string& host, uint16_t port, - std::shared_ptr<User> ticket, - int rpc_timeout) { - std::shared_ptr<RpcChannelImplementation> channel = std::make_shared<RpcChannelImplementation>( - shared_from_this(), host, port, ticket, rpc_timeout); - - /* static_pointer_cast is safe since RpcChannelImplementation derives - * from RpcChannel, otherwise, dynamic_pointer_cast should be used. */ - return std::static_pointer_cast<RpcChannel>(channel); -} - -void RpcClient::CallMethod(const MethodDescriptor* method, RpcController* controller, - const Message* req_msg, Message* resp_msg, Closure* done, - const std::string& host, uint16_t port, std::shared_ptr<User> ticket) { - std::shared_ptr<Message> shared_req(const_cast<Message*>(req_msg)); - std::shared_ptr<Message> shared_resp(resp_msg); - - std::unique_ptr<Request> req = std::make_unique<Request>(shared_req, shared_resp, method->name()); - - AsyncCall(host, port, std::move(req), ticket, method->service()->name()) - .then([done, this](std::unique_ptr<Response> resp) { done->Run(); }); -} +} // namespace hbase diff --git a/hbase-native-client/connection/rpc-client.h b/hbase-native-client/connection/rpc-client.h index 5c11ab5..d416ceb 100644 --- a/hbase-native-client/connection/rpc-client.h +++ b/hbase-native-client/connection/rpc-client.h @@ -38,24 +38,15 @@ using hbase::ConnectionPool; using hbase::RpcConnection; using hbase::security::User; -using google::protobuf::MethodDescriptor; -using google::protobuf::RpcChannel; using google::protobuf::Message; -using google::protobuf::RpcController; -using google::protobuf::Closure; - using std::chrono::nanoseconds; -class RpcChannelImplementation; - namespace hbase { -class RpcClient : public std::enable_shared_from_this<RpcClient> { - friend class RpcChannelImplementation; - +class RpcClient { public: - RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, - std::shared_ptr<Codec> codec, nanoseconds connect_timeout); + RpcClient(std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor, std::shared_ptr<Codec> codec, + nanoseconds connect_timeout = nanoseconds(0)); virtual ~RpcClient() { Close(); } @@ -79,40 +70,13 @@ class RpcClient : public std::enable_shared_from_this<RpcClient> { virtual void Close(); - virtual std::shared_ptr<RpcChannel> CreateRpcChannel(const std::string &host, uint16_t port, - std::shared_ptr<User> ticket, - int rpc_timeout); - std::shared_ptr<ConnectionPool> connection_pool() const { return cp_; } private: - void CallMethod(const MethodDescriptor *method, RpcController *controller, const Message *req_msg, - Message *resp_msg, Closure *done, const std::string &host, uint16_t port, - std::shared_ptr<User> ticket); std::shared_ptr<RpcConnection> GetConnection(std::shared_ptr<ConnectionId> remote_id); private: std::shared_ptr<ConnectionPool> cp_; std::shared_ptr<wangle::IOThreadPoolExecutor> io_executor_; }; - -class AbstractRpcChannel : public RpcChannel { - public: - AbstractRpcChannel(std::shared_ptr<RpcClient> rpc_client, const std::string &host, uint16_t port, - std::shared_ptr<User> ticket, int rpc_timeout) - : rpc_client_(rpc_client), - host_(host), - port_(port), - ticket_(ticket), - rpc_timeout_(rpc_timeout) {} - - virtual ~AbstractRpcChannel() = default; - - protected: - std::shared_ptr<RpcClient> rpc_client_; - std::string host_; - uint16_t port_; - std::shared_ptr<User> ticket_; - int rpc_timeout_; -}; } // namespace hbase diff --git a/hbase-native-client/core/BUCK b/hbase-native-client/core/BUCK index e541d8f..2f4f6c1 100644 --- a/hbase-native-client/core/BUCK +++ b/hbase-native-client/core/BUCK @@ -40,6 +40,9 @@ cxx_library( "request_converter.h", "response_converter.h", "table.h", + "async-rpc-retrying-caller-factory.h", + "async-rpc-retrying-caller.h", + "hbase-rpc-controller.h", ], srcs=[ "cell.cc", @@ -58,6 +61,8 @@ cxx_library( "table.cc", ], deps=[ + "//exceptions:exceptions", + "//utils:utils", "//connection:connection", "//if:if", "//serde:serde", @@ -96,6 +101,15 @@ cxx_test( deps=[":core",], run_test_separately=True,) cxx_test( + name="retry-test", + srcs=["async-rpc-retrying-test.cc",], + deps=[ + ":core", + "//test-util:test-util", + "//exceptions:exceptions", + ], + run_test_separately=True,) +cxx_test( name="time_range-test", srcs=["time_range-test.cc",], deps=[":core",], diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.cc b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc new file mode 100644 index 0000000..0ac9cac --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-rpc-retrying-caller-factory.h" + +namespace hbase {} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller-factory.h b/hbase-native-client/core/async-rpc-retrying-caller-factory.h new file mode 100644 index 0000000..3342e29 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller-factory.h @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Logging.h> +#include <folly/io/IOBuf.h> +#include <folly/io/async/EventBase.h> +#include <chrono> +#include <memory> +#include <string> + +#include "connection/rpc-client.h" +#include "core/async-rpc-retrying-caller.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" + +using hbase::pb::TableName; +using std::chrono::nanoseconds; + +namespace hbase { + +template <typename CONN, typename RESP, typename RPC_CLIENT> +class SingleRequestCallerBuilder + : public std::enable_shared_from_this<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> { + public: + explicit SingleRequestCallerBuilder(std::shared_ptr<CONN> conn) + : conn_(conn), + table_name_(nullptr), + rpc_timeout_nanos_(0), + operation_timeout_nanos_(0), + locate_type_(RegionLocateType::kCurrent) {} + + virtual ~SingleRequestCallerBuilder() = default; + + typedef SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT> GenenericThisType; + typedef std::shared_ptr<GenenericThisType> SharedThisPtr; + + SharedThisPtr table(std::shared_ptr<TableName> table_name) { + table_name_ = table_name; + return shared_this(); + } + + SharedThisPtr rpc_timeout(nanoseconds rpc_timeout_nanos) { + rpc_timeout_nanos_ = rpc_timeout_nanos; + return shared_this(); + } + + SharedThisPtr operation_timeout(nanoseconds operation_timeout_nanos) { + operation_timeout_nanos_ = operation_timeout_nanos; + return shared_this(); + } + + SharedThisPtr row(const std::string& row) { + row_ = row; + return shared_this(); + } + + SharedThisPtr locate_type(RegionLocateType locate_type) { + locate_type_ = locate_type; + return shared_this(); + } + + SharedThisPtr action(Callable<RESP, RPC_CLIENT> callable) { + callable_ = callable; + return shared_this(); + } + + folly::Future<RESP> Call() { return Build()->Call(); } + + std::shared_ptr<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>> Build() { + return std::make_shared<AsyncSingleRequestRpcRetryingCaller<CONN, RESP, RPC_CLIENT>>( + conn_, table_name_, row_, locate_type_, callable_, conn_->get_conn_conf()->GetPauseNs(), + conn_->get_conn_conf()->GetMaxRetries(), operation_timeout_nanos_, rpc_timeout_nanos_, + conn_->get_conn_conf()->GetStartLogErrorsCount()); + } + + private: + SharedThisPtr shared_this() { + return std::enable_shared_from_this<GenenericThisType>::shared_from_this(); + } + + private: + std::shared_ptr<CONN> conn_; + std::shared_ptr<TableName> table_name_; + nanoseconds rpc_timeout_nanos_; + nanoseconds operation_timeout_nanos_; + std::string row_; + RegionLocateType locate_type_; + Callable<RESP, RPC_CLIENT> callable_; +}; // end of SingleRequestCallerBuilder + +template <typename CONN> +class AsyncRpcRetryingCallerFactory { + private: + std::shared_ptr<CONN> conn_; + + public: + explicit AsyncRpcRetryingCallerFactory(std::shared_ptr<CONN> conn) : conn_(conn) {} + + virtual ~AsyncRpcRetryingCallerFactory() = default; + + template <typename RESP, typename RPC_CLIENT = hbase::RpcClient> + std::shared_ptr<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>> Single() { + return std::make_shared<SingleRequestCallerBuilder<CONN, RESP, RPC_CLIENT>>(conn_); + } +}; + +} // namespace hbase diff --git a/hbase-native-client/core/async-rpc-retrying-caller.cc b/hbase-native-client/core/async-rpc-retrying-caller.cc new file mode 100644 index 0000000..743b6bb --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/async-rpc-retrying-caller.h" + +namespace hbase {} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-caller.h b/hbase-native-client/core/async-rpc-retrying-caller.h new file mode 100644 index 0000000..f7a1523 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-caller.h @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <folly/Format.h> +#include <folly/Logging.h> +#include <folly/futures/Future.h> +#include <folly/io/async/EventBase.h> +#include <folly/io/async/HHWheelTimer.h> +#include <algorithm> +#include <chrono> +#include <functional> +#include <memory> +#include <string> +#include <type_traits> +#include <utility> +#include <vector> +#include "connection/rpc-client.h" +#include "core/hbase-rpc-controller.h" +#include "core/region-location.h" +#include "exceptions/exception.h" +#include "if/HBase.pb.h" +#include "utils/connection-util.h" +#include "utils/sys-util.h" +#include "utils/time-util.h" + +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { + +template <typename T> +using Supplier = std::function<T()>; + +template <typename T> +using Consumer = std::function<void(T)>; + +template <typename R, typename S, typename... I> +using ReqConverter = std::function<R(const S&, const I&...)>; + +template <typename R, typename S> +using RespConverter = std::function<R(const S&)>; + +template <typename RESP> +using RpcCallback = std::function<void(const RESP&)>; + +template <typename REQ, typename RESP, typename RPC_CLIENT = hbase::RpcClient> +using RpcCall = std::function<folly::Future<std::unique_ptr<RESP>>( + std::shared_ptr<RPC_CLIENT>, std::shared_ptr<RegionLocation>, + std::shared_ptr<HBaseRpcController>, std::unique_ptr<REQ>)>; + +template <typename RESP, typename RPC_CLIENT = hbase::RpcClient> +using Callable = std::function<folly::Future<RESP>(std::shared_ptr<HBaseRpcController>, + std::shared_ptr<RegionLocation>, + std::shared_ptr<RPC_CLIENT>)>; + +template <typename CONN, typename RESP, typename RPC_CLIENT = hbase::RpcClient> +class AsyncSingleRequestRpcRetryingCaller { + public: + AsyncSingleRequestRpcRetryingCaller(std::shared_ptr<CONN> conn, + std::shared_ptr<hbase::pb::TableName> table_name, + const std::string& row, RegionLocateType locate_type, + Callable<RESP, RPC_CLIENT> callable, nanoseconds pause_ns, + uint32_t max_retries, nanoseconds operation_timeout_nanos, + nanoseconds rpc_timeout_nanos, + uint32_t start_log_errors_count) + : conn_(conn), + table_name_(table_name), + row_(row), + locate_type_(locate_type), + callable_(callable), + pause_ns_(pause_ns), + max_retries_(max_retries), + operation_timeout_nanos_(operation_timeout_nanos), + rpc_timeout_nanos_(rpc_timeout_nanos), + start_log_errors_count_(start_log_errors_count), + promise_(std::make_shared<folly::Promise<RESP>>()), + tries_(1) { + controller_ = conn_->get_rpc_controller_factory()->NewController(); + start_ns_ = TimeUtil::GetNowNanos(); + max_attempts_ = ConnectionUtils::Retries2Attempts(max_retries); + exceptions_ = std::make_shared<std::vector<ThrowableWithExtraContext>>(); + retry_timer_ = folly::HHWheelTimer::newTimer(&event_base_); + } + + virtual ~AsyncSingleRequestRpcRetryingCaller() {} + + folly::Future<RESP> Call() { + auto f = promise_->getFuture(); + LocateThenCall(); + return f; + } + + private: + void LocateThenCall() { + int64_t locate_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + locate_timeout_ns = RemainingTimeNs(); + if (locate_timeout_ns <= 0) { + CompleteExceptionally(); + return; + } + } else { + locate_timeout_ns = -1L; + } + + conn_->get_locator() + ->GetRegionLocation(table_name_, row_, locate_type_, locate_timeout_ns) + .then([this](RegionLocation& loc) { Call(loc); }) + .onError([this](const std::exception& e) { + OnError(e, + [this]() -> std::string { + return "Locate '" + row_ + "' in " + table_name_->namespace_() + "::" + + table_name_->qualifier() + " failed, tries = " + std::to_string(tries_) + + ", maxAttempts = " + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [](const std::exception& error) {}); + }); + } + + void OnError(const std::exception& error, Supplier<std::string> err_msg, + Consumer<std::exception> update_cached_location) { + ThrowableWithExtraContext twec(std::make_shared<std::exception>(error), + TimeUtil::GetNowNanos()); + exceptions_->push_back(twec); + if (SysUtil::InstanceOf<DoNotRetryIOException, std::exception>(error) || + tries_ >= max_retries_) { + CompleteExceptionally(); + return; + } + + int64_t delay_ns; + if (operation_timeout_nanos_.count() > 0) { + int64_t max_delay_ns = RemainingTimeNs() - ConnectionUtils::kSleepDeltaNs; + if (max_delay_ns <= 0) { + CompleteExceptionally(); + return; + } + delay_ns = + std::min(max_delay_ns, ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1)); + } else { + delay_ns = ConnectionUtils::GetPauseTime(pause_ns_.count(), tries_ - 1); + } + update_cached_location(error); + tries_++; + retry_timer_->scheduleTimeoutFn([this]() { LocateThenCall(); }, + milliseconds(TimeUtil::ToMillis(delay_ns))); + } + + void Call(const RegionLocation& loc) { + int64_t call_timeout_ns; + if (operation_timeout_nanos_.count() > 0) { + call_timeout_ns = this->RemainingTimeNs(); + if (call_timeout_ns <= 0) { + this->CompleteExceptionally(); + return; + } + call_timeout_ns = std::min(call_timeout_ns, rpc_timeout_nanos_.count()); + } else { + call_timeout_ns = rpc_timeout_nanos_.count(); + } + + std::shared_ptr<RPC_CLIENT> rpc_client; + try { + rpc_client = conn_->GetRpcClient(); + } catch (const IOException& e) { + OnError(e, + [&, this]() -> std::string { + return "Get async rpc_client to " + + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + " ms"; + }, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error); + }); + return; + } + + ResetController(controller_, call_timeout_ns); + + callable_(controller_, std::make_shared<RegionLocation>(loc), rpc_client) + .then([this](const RESP& resp) { this->promise_->setValue(std::move(resp)); }) + .onError([&, this](const std::exception& e) { + OnError(e, + [&, this]() -> std::string { + return "Call to " + folly::sformat("{0}:{1}", loc.server_name().host_name(), + loc.server_name().port()) + + " for '" + row_ + "' in " + loc.DebugString() + " of " + + table_name_->namespace_() + "::" + table_name_->qualifier() + + " failed, tries = " + std::to_string(tries_) + ", maxAttempts = " + + std::to_string(max_attempts_) + ", timeout = " + + TimeUtil::ToMillisStr(this->operation_timeout_nanos_) + + " ms, time elapsed = " + TimeUtil::ElapsedMillisStr(this->start_ns_) + + " ms"; + }, + [&, this](const std::exception& error) { + conn_->get_locator()->UpdateCachedLocation(loc, error); + }); + return; + }); + } + + void CompleteExceptionally() { + this->promise_->setException(RetriesExhaustedException(tries_ - 1, exceptions_)); + } + + int64_t RemainingTimeNs() { + return operation_timeout_nanos_.count() - (TimeUtil::GetNowNanos() - start_ns_); + } + + static void ResetController(std::shared_ptr<HBaseRpcController> controller, + const int64_t& timeout_ns) { + controller->Reset(); + if (timeout_ns >= 0) { + controller->set_call_timeout( + milliseconds(std::min(static_cast<int64_t>(INT_MAX), TimeUtil::ToMillis(timeout_ns)))); + } + } + + private: + folly::HHWheelTimer::UniquePtr retry_timer_; + std::shared_ptr<CONN> conn_; + std::shared_ptr<hbase::pb::TableName> table_name_; + std::string row_; + RegionLocateType locate_type_; + Callable<RESP, RPC_CLIENT> callable_; + nanoseconds pause_ns_; + uint32_t max_retries_; + nanoseconds operation_timeout_nanos_; + nanoseconds rpc_timeout_nanos_; + uint32_t start_log_errors_count_; + std::shared_ptr<folly::Promise<RESP>> promise_; + std::shared_ptr<HBaseRpcController> controller_; + uint64_t start_ns_; + uint32_t tries_; + std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions_; + uint32_t max_attempts_; + folly::EventBase event_base_; +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/async-rpc-retrying-test.cc b/hbase-native-client/core/async-rpc-retrying-test.cc new file mode 100644 index 0000000..a9b0017 --- /dev/null +++ b/hbase-native-client/core/async-rpc-retrying-test.cc @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Logging.h> +#include <folly/Memory.h> +#include <folly/futures/Future.h> +#include <gmock/gmock.h> +#include <google/protobuf/stubs/callback.h> +#include <wangle/concurrent/IOThreadPoolExecutor.h> + +#include <functional> +#include <string> + +#include "connection/request.h" +#include "connection/response.h" +#include "connection/rpc-client.h" +#include "core/async-rpc-retrying-caller-factory.h" +#include "core/async-rpc-retrying-caller.h" +#include "core/client.h" +#include "core/hbase-rpc-controller.h" +#include "core/keyvalue-codec.h" +#include "core/region-location.h" +#include "core/request_converter.h" +#include "core/response_converter.h" +#include "core/result.h" +#include "exceptions/exception.h" +#include "if/Client.pb.h" +#include "if/HBase.pb.h" +#include "test-util/test-util.h" + +using namespace google::protobuf; +using namespace hbase; +using namespace hbase::pb; +using namespace std::placeholders; +using namespace testing; +using ::testing::Return; +using ::testing::_; +using std::chrono::nanoseconds; + +class MockRpcControllerFactory { + public: + MOCK_METHOD0(NewController, std::shared_ptr<HBaseRpcController>()); +}; + +class MockAsyncConnectionConfiguration { + public: + MOCK_METHOD0(GetPauseNs, nanoseconds()); + MOCK_METHOD0(GetMaxRetries, int32_t()); + MOCK_METHOD0(GetStartLogErrorsCount, int32_t()); + MOCK_METHOD0(GetReadRpcTimeoutNs, nanoseconds()); + MOCK_METHOD0(GetOperationTimeoutNs, nanoseconds()); +}; + +class AsyncRegionLocator { + public: + explicit AsyncRegionLocator(std::shared_ptr<RegionLocation> region_location) + : region_location_(region_location) {} + ~AsyncRegionLocator() = default; + + folly::Future<RegionLocation> GetRegionLocation(std::shared_ptr<hbase::pb::TableName>, + const std::string&, RegionLocateType, int64_t) { + folly::Promise<RegionLocation> promise; + promise.setValue(*region_location_); + return promise.getFuture(); + } + + void UpdateCachedLocation(const RegionLocation&, const std::exception&) {} + + private: + std::shared_ptr<RegionLocation> region_location_; +}; + +class MockAsyncConnection { + public: + MOCK_METHOD0(get_conn_conf, std::shared_ptr<MockAsyncConnectionConfiguration>()); + MOCK_METHOD0(get_rpc_controller_factory, std::shared_ptr<MockRpcControllerFactory>()); + MOCK_METHOD0(get_locator, std::shared_ptr<AsyncRegionLocator>()); + MOCK_METHOD0(GetRpcClient, std::shared_ptr<hbase::RpcClient>()); +}; + +template <typename CONN> +class MockRawAsyncTableImpl { + public: + explicit MockRawAsyncTableImpl(std::shared_ptr<CONN> conn) + : conn_(conn), promise_(std::make_shared<folly::Promise<hbase::Result>>()) {} + virtual ~MockRawAsyncTableImpl() = default; + + /* implement this in real RawAsyncTableImpl. */ + + /* in real RawAsyncTableImpl, this should be private. */ + folly::Future<hbase::Result> GetCall(std::shared_ptr<hbase::RpcClient> rpc_client, + std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const hbase::Get& get) { + hbase::RpcCall<hbase::Request, hbase::Response, hbase::RpcClient> rpc_call = []( + std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<RegionLocation> loc, + std::shared_ptr<HBaseRpcController> controller, + std::unique_ptr<hbase::Request> preq) -> folly::Future<std::unique_ptr<hbase::Response>> { + return rpc_client->AsyncCall(loc->server_name().host_name(), loc->server_name().port(), + std::move(preq), User::defaultUser(), "ClientService"); + }; + + return Call<hbase::Get, hbase::Request, hbase::Response, hbase::Result>( + rpc_client, controller, loc, get, &hbase::RequestConverter::ToGetRequest, rpc_call, + &hbase::ResponseConverter::FromGetResponse); + } + + /* in real RawAsyncTableImpl, this should be private. */ + template <typename REQ, typename PREQ, typename PRESP, typename RESP> + folly::Future<RESP> Call( + std::shared_ptr<hbase::RpcClient> rpc_client, std::shared_ptr<HBaseRpcController> controller, + std::shared_ptr<RegionLocation> loc, const REQ& req, + const ReqConverter<std::unique_ptr<PREQ>, REQ, std::string>& req_converter, + const hbase::RpcCall<PREQ, PRESP, hbase::RpcClient>& rpc_call, + const RespConverter<std::unique_ptr<RESP>, PRESP>& resp_converter) { + rpc_call(rpc_client, loc, controller, std::move(req_converter(req, loc->region_name()))) + .then([&, this](std::unique_ptr<PRESP> presp) { + std::unique_ptr<hbase::Result> result = hbase::ResponseConverter::FromGetResponse(*presp); + promise_->setValue(std::move(*result)); + }) + .onError([this](const std::exception& e) { promise_->setException(e); }); + return promise_->getFuture(); + } + + private: + std::shared_ptr<CONN> conn_; + std::shared_ptr<folly::Promise<hbase::Result>> promise_; +}; + +TEST(AsyncRpcRetryTest, TestGetBasic) { + // Remove already configured env if present. + unsetenv("HBASE_CONF"); + + // Using TestUtil to populate test data + hbase::TestUtil* test_util = new hbase::TestUtil(); + test_util->RunShellCmd("create 't', 'd'"); + test_util->RunShellCmd("put 't', 'test2', 'd:2', 'value2'"); + test_util->RunShellCmd("put 't', 'test2', 'd:extra', 'value for extra'"); + + // Create TableName and Row to be fetched from HBase + auto tn = folly::to<hbase::pb::TableName>("t"); + auto row = "test2"; + + // Get to be performed on above HBase Table + hbase::Get get(row); + + // Create Configuration + hbase::Configuration conf; + + // Create a client + Client client(conf); + + // Get connection to HBase Table + auto table = client.Table(tn); + ASSERT_TRUE(table) << "Unable to get connection to Table."; + + /* init region location and rpc channel */ + auto region_location = table->GetRegionLocation(row); + + auto io_executor_ = std::make_shared<wangle::IOThreadPoolExecutor>(1); + auto codec = std::make_shared<hbase::KeyValueCodec>(); + auto rpc_client = std::make_shared<RpcClient>(io_executor_, codec); + + /* init rpc controller */ + auto controller = std::make_shared<HBaseRpcController>(); + + /* init rpc controller factory */ + auto controller_factory = std::make_shared<MockRpcControllerFactory>(); + EXPECT_CALL((*controller_factory), NewController()).Times(1).WillRepeatedly(Return(controller)); + + /* init connection configuration */ + auto connection_conf = std::make_shared<MockAsyncConnectionConfiguration>(); + EXPECT_CALL((*connection_conf), GetPauseNs()) + .Times(1) + .WillRepeatedly(Return(nanoseconds(100000000))); + EXPECT_CALL((*connection_conf), GetMaxRetries()).Times(1).WillRepeatedly(Return(31)); + EXPECT_CALL((*connection_conf), GetStartLogErrorsCount()).Times(1).WillRepeatedly(Return(9)); + EXPECT_CALL((*connection_conf), GetReadRpcTimeoutNs()) + .Times(1) + .WillRepeatedly(Return(nanoseconds(60000000000))); + EXPECT_CALL((*connection_conf), GetOperationTimeoutNs()) + .Times(1) + .WillRepeatedly(Return(nanoseconds(1200000000000))); + + /* init region locator */ + auto region_locator = std::make_shared<AsyncRegionLocator>(region_location); + + /* init hbase client connection */ + auto conn = std::make_shared<MockAsyncConnection>(); + EXPECT_CALL((*conn), get_conn_conf()).Times(AtLeast(1)).WillRepeatedly(Return(connection_conf)); + EXPECT_CALL((*conn), get_rpc_controller_factory()) + .Times(AtLeast(1)) + .WillRepeatedly(Return(controller_factory)); + EXPECT_CALL((*conn), get_locator()).Times(AtLeast(1)).WillRepeatedly(Return(region_locator)); + EXPECT_CALL((*conn), GetRpcClient()).Times(AtLeast(1)).WillRepeatedly(Return(rpc_client)); + + /* init retry caller factory */ + auto tableImpl = std::make_shared<MockRawAsyncTableImpl<MockAsyncConnection>>(conn); + AsyncRpcRetryingCallerFactory<MockAsyncConnection> caller_factory(conn); + + /* init request caller builder */ + auto builder = caller_factory.Single<hbase::Result>(); + + /* call with retry to get result */ + try { + auto async_caller = + builder->table(std::make_shared<TableName>(tn)) + ->row(row) + ->rpc_timeout(conn->get_conn_conf()->GetReadRpcTimeoutNs()) + ->operation_timeout(conn->get_conn_conf()->GetOperationTimeoutNs()) + ->action( + [=, &get]( + std::shared_ptr<hbase::HBaseRpcController> controller, + std::shared_ptr<hbase::RegionLocation> loc, + std::shared_ptr<hbase::RpcClient> rpc_client) -> folly::Future<hbase::Result> { + return tableImpl->GetCall(rpc_client, controller, loc, get); + }) + ->Build(); + + hbase::Result result = async_caller->Call().get(); + + /*Stopping the connection as we are getting segfault due to some folly issue + The connection stays open and we don't want that. + So we are stopping the connection. + We can remove this once we have fixed the folly part */ + delete test_util; + + // Test the values, should be same as in put executed on hbase shell + ASSERT_TRUE(!result.IsEmpty()) << "Result shouldn't be empty."; + EXPECT_EQ("test2", result.Row()); + EXPECT_EQ("value2", *(result.Value("d", "2"))); + EXPECT_EQ("value for extra", *(result.Value("d", "extra"))); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + throw e; + } + + table->Close(); + client.Close(); +} diff --git a/hbase-native-client/core/client.cc b/hbase-native-client/core/client.cc index 240da72..f0483ef 100644 --- a/hbase-native-client/core/client.cc +++ b/hbase-native-client/core/client.cc @@ -57,7 +57,8 @@ void Client::init(const hbase::Configuration &conf) { } else { LOG(WARNING) << "Not using RPC Cell Codec"; } - rpc_client_ = std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout()); + rpc_client_ = + std::make_shared<hbase::RpcClient>(io_executor_, codec, conn_conf_->connect_timeout()); location_cache_ = std::make_shared<hbase::LocationCache>(conf_, cpu_executor_, rpc_client_->connection_pool()); } diff --git a/hbase-native-client/core/client.h b/hbase-native-client/core/client.h index a96d6f3..e73ab70 100644 --- a/hbase-native-client/core/client.h +++ b/hbase-native-client/core/client.h @@ -89,7 +89,7 @@ class Client { bool is_closed_ = false; /** Methods */ - void init(const hbase::Configuration &conf); + void init(const hbase::Configuration& conf); }; } // namespace hbase diff --git a/hbase-native-client/core/filter.h b/hbase-native-client/core/filter.h index b5b7133..10accaa 100644 --- a/hbase-native-client/core/filter.h +++ b/hbase-native-client/core/filter.h @@ -20,9 +20,9 @@ #pragma once #include <memory> +#include <set> #include <string> #include <utility> -#include <set> #include <vector> #include "if/Comparator.pb.h" diff --git a/hbase-native-client/core/hbase-rpc-controller.cc b/hbase-native-client/core/hbase-rpc-controller.cc new file mode 100644 index 0000000..bc53781 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.cc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "core/hbase-rpc-controller.h" + +namespace hbase {} /* namespace hbase */ diff --git a/hbase-native-client/core/hbase-rpc-controller.h b/hbase-native-client/core/hbase-rpc-controller.h new file mode 100644 index 0000000..661c810 --- /dev/null +++ b/hbase-native-client/core/hbase-rpc-controller.h @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <google/protobuf/service.h> +#include <chrono> +#include <string> + +using google::protobuf::RpcController; +using google::protobuf::Closure; + +using std::chrono::milliseconds; + +namespace hbase { + +class HBaseRpcController : public RpcController { + public: + HBaseRpcController() {} + virtual ~HBaseRpcController() = default; + + void set_call_timeout(const milliseconds& call_timeout) { + // TODO: + } + + void Reset() override {} + + bool Failed() const override { return false; } + + std::string ErrorText() const override { return ""; } + + void StartCancel() override {} + + void SetFailed(const std::string& reason) override {} + + bool IsCanceled() const override { return false; } + + void NotifyOnCancel(Closure* callback) override {} +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/core/location-cache.cc b/hbase-native-client/core/location-cache.cc index da9f64a..17032fe 100644 --- a/hbase-native-client/core/location-cache.cc +++ b/hbase-native-client/core/location-cache.cc @@ -25,6 +25,7 @@ #include <utility> +#include <folly/Logging.h> #include "connection/response.h" #include "connection/rpc-connection.h" #include "if/Client.pb.h" diff --git a/hbase-native-client/core/region-location.h b/hbase-native-client/core/region-location.h index b0411cb..4087d94 100644 --- a/hbase-native-client/core/region-location.h +++ b/hbase-native-client/core/region-location.h @@ -26,6 +26,8 @@ namespace hbase { +enum RegionLocateType { kBefore, kCurrent, kAfter }; + /** * @brief class to hold where a region is located. * @@ -49,17 +51,17 @@ class RegionLocation { /** * Get a reference to the regio info */ - const hbase::pb::RegionInfo ®ion_info() { return ri_; } + const hbase::pb::RegionInfo ®ion_info() const { return ri_; } /** * Get a reference to the server name */ - const hbase::pb::ServerName &server_name() { return sn_; } + const hbase::pb::ServerName &server_name() const { return sn_; } /** * Get a reference to the region name. */ - const std::string ®ion_name() { return region_name_; } + const std::string ®ion_name() const { return region_name_; } /** * Get a service. This could be closed or null. It's the caller's @@ -79,7 +81,7 @@ class RegionLocation { */ void set_server_name(hbase::pb::ServerName sn) { sn_ = sn; } - const std::string DebugString() { + const std::string DebugString() const { return "region_info:" + ri_.ShortDebugString() + ", server_name:" + sn_.ShortDebugString(); } diff --git a/hbase-native-client/core/response_converter.cc b/hbase-native-client/core/response_converter.cc index 19a3554..2497306 100644 --- a/hbase-native-client/core/response_converter.cc +++ b/hbase-native-client/core/response_converter.cc @@ -19,6 +19,7 @@ #include "core/response_converter.h" +#include <string> #include <vector> #include "core/cell.h" diff --git a/hbase-native-client/core/response_converter.h b/hbase-native-client/core/response_converter.h index 859644b..759b1ce 100644 --- a/hbase-native-client/core/response_converter.h +++ b/hbase-native-client/core/response_converter.h @@ -20,6 +20,7 @@ #pragma once #include <memory> +#include <vector> #include "connection/response.h" #include "core/result.h" #include "if/Client.pb.h" diff --git a/hbase-native-client/core/table.cc b/hbase-native-client/core/table.cc index 4e30d4b..ba4dc29 100644 --- a/hbase-native-client/core/table.cc +++ b/hbase-native-client/core/table.cc @@ -71,4 +71,8 @@ void Table::Close() { is_closed_ = true; } +std::shared_ptr<RegionLocation> Table::GetRegionLocation(const std::string &row) { + return location_cache_->LocateRegion(*table_name_, row).get(); +} + } /* namespace hbase */ diff --git a/hbase-native-client/core/table.h b/hbase-native-client/core/table.h index 0e98cd2..f82382e 100644 --- a/hbase-native-client/core/table.h +++ b/hbase-native-client/core/table.h @@ -57,6 +57,11 @@ class Table { */ void Close(); + /** + * @brief - Get region location for a row in current table. + */ + std::shared_ptr<RegionLocation> GetRegionLocation(const std::string &row); + private: std::shared_ptr<TableName> table_name_; std::shared_ptr<hbase::LocationCache> location_cache_; diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/exceptions/BUCK similarity index 71% copy from hbase-native-client/utils/BUCK copy to hbase-native-client/exceptions/BUCK index 796f2f5..a23654c 100644 --- a/hbase-native-client/utils/BUCK +++ b/hbase-native-client/exceptions/BUCK @@ -16,14 +16,9 @@ # limitations under the License. cxx_library( - name="utils", - exported_headers=["user-util.h", "version.h"], - srcs=["user-util.cc",], - deps=['//third-party:folly',], - tests=[":user-util-test"], - visibility=['PUBLIC',], - compiler_flags=['-Weffc++'],) -cxx_test( - name="user-util-test", - srcs=["user-util-test.cc",], - deps=[":utils",],) + name="exceptions", + exported_headers=["exception.h",], + srcs=[], + deps=["//third-party:folly",], + compiler_flags=['-Weffc++'], + visibility=['//core/...'],) \ No newline at end of file diff --git a/hbase-native-client/exceptions/exception.h b/hbase-native-client/exceptions/exception.h new file mode 100644 index 0000000..c0c4142 --- /dev/null +++ b/hbase-native-client/exceptions/exception.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <exception> +#include <string> +#include <vector> +#include <folly/io/IOBuf.h> + +namespace hbase { + +class ThrowableWithExtraContext { +public: + ThrowableWithExtraContext(std::shared_ptr<std::exception> cause, + const long& when) : + cause_(cause), when_(when), extras_("") { + } + + ThrowableWithExtraContext(std::shared_ptr<std::exception> cause, + const long& when, const std::string& extras) : + cause_(cause), when_(when), extras_(extras) { + } + + std::string ToString() { + // TODO: + // return new Date(this.when).toString() + ", " + extras + ", " + t.toString(); + return extras_ + ", " + cause_->what(); + } + + std::shared_ptr<std::exception> cause() { + return cause_; + } +private: + std::shared_ptr<std::exception> cause_; + long when_; + std::string extras_; +}; + +class IOException: public std::logic_error { +public: + IOException( + const std::string& what) : + logic_error(what), cause_(nullptr) {} + IOException( + const std::string& what, + std::shared_ptr<std::exception> cause) : + logic_error(what), cause_(cause) {} + virtual ~IOException() = default; + + std::shared_ptr<std::exception> cause() { + return cause_; + } +private: + const std::shared_ptr<std::exception> cause_; +}; + +class RetriesExhaustedException: public IOException { +public: + RetriesExhaustedException( + const int& num_retries, + std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) : + IOException( + GetMessage(num_retries, exceptions), + exceptions->empty() ? nullptr : (*exceptions)[exceptions->size() - 1].cause()){ + } + virtual ~RetriesExhaustedException() = default; + +private: + std::string GetMessage( + const int& num_retries, + std::shared_ptr<std::vector<ThrowableWithExtraContext>> exceptions) { + std::string buffer("Failed after attempts="); + buffer.append(std::to_string(num_retries + 1)); + buffer.append(", exceptions:\n"); + for (auto it = exceptions->begin(); it != exceptions->end(); it++) { + buffer.append(it->ToString()); + buffer.append("\n"); + } + return buffer; + } +}; + +class HBaseIOException : public IOException { +}; + +class DoNotRetryIOException : public HBaseIOException { +}; +} // namespace hbase diff --git a/hbase-native-client/utils/BUCK b/hbase-native-client/utils/BUCK index 796f2f5..eae929e 100644 --- a/hbase-native-client/utils/BUCK +++ b/hbase-native-client/utils/BUCK @@ -17,8 +17,11 @@ cxx_library( name="utils", - exported_headers=["user-util.h", "version.h"], - srcs=["user-util.cc",], + exported_headers=[ + "user-util.h", "version.h", "connection-util.h", "sys-util.h", + "time-util.h" + ], + srcs=["user-util.cc", "connection-util.cc"], deps=['//third-party:folly',], tests=[":user-util-test"], visibility=['PUBLIC',], diff --git a/hbase-native-client/utils/connection-util.cc b/hbase-native-client/utils/connection-util.cc new file mode 100644 index 0000000..76689bf --- /dev/null +++ b/hbase-native-client/utils/connection-util.cc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "utils/connection-util.h" + +namespace hbase { + +const std::vector<uint32_t> ConnectionUtils::kRetryBackoff = {1, 2, 3, 5, 10, 20, 40, + 100, 100, 100, 100, 200, 200}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/connection-util.h b/hbase-native-client/utils/connection-util.h new file mode 100644 index 0000000..f52c2f9 --- /dev/null +++ b/hbase-native-client/utils/connection-util.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <algorithm> +#include <climits> +#include <cstdlib> +#include <memory> +#include <vector> +#include "utils/time-util.h" + +namespace hbase { +class ConnectionUtils { + public: + static int Retries2Attempts(const int& retries) { + return std::max(1, retries == INT_MAX ? INT_MAX : retries + 1); + } + + /* Add a delta to avoid timeout immediately after a retry sleeping. */ + static const uint64_t kSleepDeltaNs = 1000000; + + static const std::vector<uint32_t> kRetryBackoff; + /** + * Calculate pause time. Built on {@link kRetryBackoff}. + * @param pause time to pause + * @param tries amount of tries + * @return How long to wait after <code>tries</code> retries + */ + static int64_t GetPauseTime(const int64_t& pause, const int32_t& tries) { + int32_t ntries = tries; + if (static_cast<size_t>(ntries) >= kRetryBackoff.size()) { + ntries = kRetryBackoff.size() - 1; + } + if (ntries < 0) { + ntries = 0; + } + + int64_t normal_pause = pause * kRetryBackoff[ntries]; + // 1% possible jitter + float r = static_cast<float>(std::rand()) / static_cast<float>(RAND_MAX); + int64_t jitter = (int64_t)(normal_pause * r * 0.01f); + return normal_pause + jitter; + } +}; +} /* namespace hbase */ diff --git a/hbase-native-client/utils/sys-util.h b/hbase-native-client/utils/sys-util.h new file mode 100644 index 0000000..68f00d7 --- /dev/null +++ b/hbase-native-client/utils/sys-util.h @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <type_traits> + +namespace hbase { + +class SysUtil { + public: + template <class BASE, typename DERIVED> + static constexpr bool InstanceOf(const DERIVED& object) { + return !dynamic_cast<const BASE*>(&object); + } + + template <typename BASE, typename DERIVED> + static constexpr bool InstanceOf() { + return std::is_base_of<BASE, DERIVED>(); + } +}; + +} /* namespace hbase */ diff --git a/hbase-native-client/utils/time-util.h b/hbase-native-client/utils/time-util.h new file mode 100644 index 0000000..bbc3b35 --- /dev/null +++ b/hbase-native-client/utils/time-util.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include <chrono> +#include <string> +using std::chrono::nanoseconds; +using std::chrono::milliseconds; + +namespace hbase { +class TimeUtil { + public: + static int64_t ToMillis(const int64_t& nanos) { + return std::chrono::duration_cast<milliseconds>(nanoseconds(nanos)).count(); + } + + static std::string ToMillisStr(const nanoseconds& nanos) { + return std::to_string(std::chrono::duration_cast<milliseconds>(nanos).count()); + } + + static int64_t GetNowNanos() { + auto duration = std::chrono::high_resolution_clock::now().time_since_epoch(); + return std::chrono::duration_cast<nanoseconds>(duration).count(); + } + + static int64_t ElapsedMillis(const int64_t& start_ns) { + return std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count(); + } + + static std::string ElapsedMillisStr(const int64_t& start_ns) { + return std::to_string( + std::chrono::duration_cast<milliseconds>(nanoseconds(GetNowNanos() - start_ns)).count()); + } +}; +} /* namespace hbase */
