http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h new file mode 100644 index 0000000..12f7f0e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection_impl.h @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_CONNECTION_IMPL_H_ +#define LIB_RPC_RPC_CONNECTION_IMPL_H_ + +#include "rpc_connection.h" +#include "rpc_engine.h" +#include "request.h" + +#include "common/auth_info.h" +#include "common/logging.h" +#include "common/util.h" +#include "common/libhdfs_events_impl.h" + +#include <asio/connect.hpp> +#include <asio/read.hpp> +#include <asio/write.hpp> + +#include <system_error> + +namespace hdfs { + +template <class Socket> +class RpcConnectionImpl : public RpcConnection { +public: + MEMCHECKED_CLASS(RpcConnectionImpl); + + RpcConnectionImpl(RpcEngine *engine); + virtual ~RpcConnectionImpl() override; + + virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, + const AuthInfo & auth_info, + RpcCallback &handler); + virtual void ConnectAndFlush( + const std::vector<::asio::ip::tcp::endpoint> &server) override; + virtual void SendHandshake(RpcCallback &handler) override; + virtual void SendContext(RpcCallback &handler) override; + virtual void Disconnect() override; + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) override; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) override; + virtual void FlushPendingRequests() override; + + + Socket &TEST_get_mutable_socket() { return socket_; } + + void TEST_set_connected(bool connected) { connected_ = connected ? kConnected : kNotYetConnected; } + + private: + const Options options_; + ::asio::ip::tcp::endpoint current_endpoint_; + std::vector<::asio::ip::tcp::endpoint> additional_endpoints_; + Socket socket_; + ::asio::deadline_timer connect_timer_; + + void ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint &remote); +}; + +template <class Socket> +RpcConnectionImpl<Socket>::RpcConnectionImpl(RpcEngine *engine) + : RpcConnection(engine), + options_(engine->options()), + socket_(engine->io_service()), + connect_timer_(engine->io_service()) +{ + LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called &" << (void*)this); +} + +template <class Socket> +RpcConnectionImpl<Socket>::~RpcConnectionImpl() { + LOG_DEBUG(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called &" << (void*)this); + + if (pending_requests_.size() > 0) + LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the pending queue"); + if (requests_on_fly_.size() > 0) + LOG_WARN(kRPC, << "RpcConnectionImpl::~RpcConnectionImpl called with items in the requests_on_fly queue"); +} + +template <class Socket> +void RpcConnectionImpl<Socket>::Connect( + const std::vector<::asio::ip::tcp::endpoint> &server, + const AuthInfo & auth_info, + RpcCallback &handler) { + LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called"); + + this->auth_info_ = auth_info; + + auto connectionSuccessfulReq = std::make_shared<Request>( + engine_, [handler](::google::protobuf::io::CodedInputStream *is, + const Status &status) { + (void)is; + handler(status); + }); + pending_requests_.push_back(connectionSuccessfulReq); + this->ConnectAndFlush(server); // need "this" so compiler can infer type of CAF +} + +template <class Socket> +void RpcConnectionImpl<Socket>::ConnectAndFlush( + const std::vector<::asio::ip::tcp::endpoint> &server) { + + LOG_INFO(kRPC, << "ConnectAndFlush called"); + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + if (server.empty()) { + Status s = Status::InvalidArgument("No endpoints provided"); + CommsError(s); + return; + } + + if (connected_ == kConnected) { + FlushPendingRequests(); + return; + } + if (connected_ != kNotYetConnected) { + LOG_WARN(kRPC, << "RpcConnectionImpl::ConnectAndFlush called while connected=" << ToString(connected_)); + return; + } + connected_ = kConnecting; + + // Take the first endpoint, but remember the alternatives for later + additional_endpoints_ = server; + ::asio::ip::tcp::endpoint first_endpoint = additional_endpoints_.front(); + additional_endpoints_.erase(additional_endpoints_.begin()); + current_endpoint_ = first_endpoint; + + auto shared_this = shared_from_this(); + socket_.async_connect(first_endpoint, [shared_this, this, first_endpoint](const ::asio::error_code &ec) { + ConnectComplete(ec, first_endpoint); + }); + + // Prompt the timer to timeout + auto weak_this = std::weak_ptr<RpcConnection>(shared_this); + connect_timer_.expires_from_now( + std::chrono::milliseconds(options_.rpc_connect_timeout)); + connect_timer_.async_wait([shared_this, this, first_endpoint](const ::asio::error_code &ec) { + if (ec) + ConnectComplete(ec, first_endpoint); + else + ConnectComplete(make_error_code(asio::error::host_unreachable), first_endpoint); + }); +} + +template <class Socket> +void RpcConnectionImpl<Socket>::ConnectComplete(const ::asio::error_code &ec, const ::asio::ip::tcp::endpoint & remote) { + auto shared_this = RpcConnectionImpl<Socket>::shared_from_this(); + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + connect_timer_.cancel(); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called"); + + // Could be an old async connect returning a result after we've moved on + if (remote != current_endpoint_) { + LOG_DEBUG(kRPC, << "Got ConnectComplete for " << remote << " but current_endpoint_ is " << current_endpoint_); + return; + } + if (connected_ != kConnecting) { + LOG_DEBUG(kRPC, << "Got ConnectComplete but current state is " << connected_);; + return; + } + + Status status = ToStatus(ec); + if(event_handlers_) { + event_response event_resp = event_handlers_->call(FS_NN_CONNECT_EVENT, cluster_name_.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (event_resp.response() == event_response::kTest_Error) { + status = event_resp.status(); + } +#endif + } + + if (status.ok()) { + StartReading(); + SendHandshake([shared_this, this](const Status & s) { + HandshakeComplete(s); + }); + } else { + LOG_DEBUG(kRPC, << "Rpc connection failed; err=" << status.ToString());; + std::string err = SafeDisconnect(get_asio_socket_ptr(&socket_)); + if(!err.empty()) { + LOG_INFO(kRPC, << "Rpc connection failed to connect to endpoint, error closing connection: " << err); + } + + if (!additional_endpoints_.empty()) { + // If we have additional endpoints, keep trying until we either run out or + // hit one + ::asio::ip::tcp::endpoint next_endpoint = additional_endpoints_.front(); + additional_endpoints_.erase(additional_endpoints_.begin()); + current_endpoint_ = next_endpoint; + + socket_.async_connect(next_endpoint, [shared_this, this, next_endpoint](const ::asio::error_code &ec) { + ConnectComplete(ec, next_endpoint); + }); + connect_timer_.expires_from_now( + std::chrono::milliseconds(options_.rpc_connect_timeout)); + connect_timer_.async_wait([shared_this, this, next_endpoint](const ::asio::error_code &ec) { + if (ec) + ConnectComplete(ec, next_endpoint); + else + ConnectComplete(make_error_code(asio::error::host_unreachable), next_endpoint); + }); + } else { + CommsError(status); + } + } +} + +template <class Socket> +void RpcConnectionImpl<Socket>::SendHandshake(RpcCallback &handler) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + LOG_TRACE(kRPC, << "RpcConnectionImpl::SendHandshake called"); + connected_ = kHandshaking; + + auto shared_this = shared_from_this(); + auto handshake_packet = PrepareHandshakePacket(); + ::asio::async_write(socket_, asio::buffer(*handshake_packet), + [handshake_packet, handler, shared_this, this]( + const ::asio::error_code &ec, size_t) { + Status status = ToStatus(ec); + handler(status); + }); +} + +template <class Socket> +void RpcConnectionImpl<Socket>::SendContext(RpcCallback &handler) { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + LOG_TRACE(kRPC, << "RpcConnectionImpl::SendContext called"); + + auto shared_this = shared_from_this(); + auto context_packet = PrepareContextPacket(); + ::asio::async_write(socket_, asio::buffer(*context_packet), + [context_packet, handler, shared_this, this]( + const ::asio::error_code &ec, size_t) { + Status status = ToStatus(ec); + handler(status); + }); +} + +template <class Socket> +void RpcConnectionImpl<Socket>::OnSendCompleted(const ::asio::error_code &ec, + size_t) { + using std::placeholders::_1; + using std::placeholders::_2; + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called"); + + request_over_the_wire_.reset(); + if (ec) { + LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message()); + CommsError(ToStatus(ec)); + return; + } + + FlushPendingRequests(); +} + +template <class Socket> +void RpcConnectionImpl<Socket>::FlushPendingRequests() { + using namespace ::std::placeholders; + + // Lock should be held + assert(lock_held(connection_state_lock_)); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called"); + + // Don't send if we don't need to + if (request_over_the_wire_) { + return; + } + + std::shared_ptr<Request> req; + switch (connected_) { + case kNotYetConnected: + return; + case kConnecting: + return; + case kHandshaking: + return; + case kAuthenticating: + if (auth_requests_.empty()) { + return; + } + req = auth_requests_.front(); + auth_requests_.erase(auth_requests_.begin()); + break; + case kConnected: + if (pending_requests_.empty()) { + return; + } + req = pending_requests_.front(); + pending_requests_.erase(pending_requests_.begin()); + break; + case kDisconnected: + LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests attempted to flush a " << ToString(connected_) << " connection"); + return; + default: + LOG_DEBUG(kRPC, << "RpcConnectionImpl::FlushPendingRequests invalid state: " << ToString(connected_)); + return; + } + + std::shared_ptr<RpcConnection> shared_this = shared_from_this(); + auto weak_this = std::weak_ptr<RpcConnection>(shared_this); + auto weak_req = std::weak_ptr<Request>(req); + + std::shared_ptr<std::string> payload = std::make_shared<std::string>(); + req->GetPacket(payload.get()); + if (!payload->empty()) { + assert(requests_on_fly_.find(req->call_id()) == requests_on_fly_.end()); + requests_on_fly_[req->call_id()] = req; + request_over_the_wire_ = req; + + req->timer().expires_from_now( + std::chrono::milliseconds(options_.rpc_timeout)); + req->timer().async_wait([weak_this, weak_req, this](const ::asio::error_code &ec) { + auto timeout_this = weak_this.lock(); + auto timeout_req = weak_req.lock(); + if (timeout_this && timeout_req) + this->HandleRpcTimeout(timeout_req, ec); + }); + + asio::async_write(socket_, asio::buffer(*payload), + [shared_this, this, payload](const ::asio::error_code &ec, + size_t size) { + OnSendCompleted(ec, size); + }); + } else { // Nothing to send for this request, inform the handler immediately + io_service().post( + // Never hold locks when calling a callback + [req]() { req->OnResponseArrived(nullptr, Status::OK()); } + ); + + // Reschedule to flush the next one + AsyncFlushPendingRequests(); + } +} + + +template <class Socket> +void RpcConnectionImpl<Socket>::OnRecvCompleted(const ::asio::error_code &original_ec, + size_t) { + using std::placeholders::_1; + using std::placeholders::_2; + std::lock_guard<std::mutex> state_lock(connection_state_lock_); + + ::asio::error_code my_ec(original_ec); + + LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called"); + + std::shared_ptr<RpcConnection> shared_this = shared_from_this(); + + if(event_handlers_) { + event_response event_resp = event_handlers_->call(FS_NN_READ_EVENT, cluster_name_.c_str(), 0); +#ifndef LIBHDFSPP_SIMULATE_ERROR_DISABLED + if (event_resp.response() == event_response::kTest_Error) { + my_ec = std::make_error_code(std::errc::network_down); + } +#endif + } + + switch (my_ec.value()) { + case 0: + // No errors + break; + case asio::error::operation_aborted: + // The event loop has been shut down. Ignore the error. + return; + default: + LOG_WARN(kRPC, << "Network error during RPC read: " << my_ec.message()); + CommsError(ToStatus(my_ec)); + return; + } + + if (!current_response_state_) { /* start a new one */ + current_response_state_ = std::make_shared<Response>(); + } + + if (current_response_state_->state_ == Response::kReadLength) { + current_response_state_->state_ = Response::kReadContent; + auto buf = ::asio::buffer(reinterpret_cast<char *>(¤t_response_state_->length_), + sizeof(current_response_state_->length_)); + asio::async_read( + socket_, buf, + [shared_this, this](const ::asio::error_code &ec, size_t size) { + OnRecvCompleted(ec, size); + }); + } else if (current_response_state_->state_ == Response::kReadContent) { + current_response_state_->state_ = Response::kParseResponse; + current_response_state_->length_ = ntohl(current_response_state_->length_); + current_response_state_->data_.resize(current_response_state_->length_); + asio::async_read( + socket_, ::asio::buffer(current_response_state_->data_), + [shared_this, this](const ::asio::error_code &ec, size_t size) { + OnRecvCompleted(ec, size); + }); + } else if (current_response_state_->state_ == Response::kParseResponse) { + // Check return status from the RPC response. We may have received a msg + // indicating a server side error. + + Status stat = HandleRpcResponse(current_response_state_); + + if(stat.get_server_exception_type() == Status::kStandbyException) { + // May need to bail out, connect to new NN, and restart loop + LOG_INFO(kRPC, << "Communicating with standby NN, attempting to reconnect"); + } + + current_response_state_ = nullptr; + StartReading(); + } +} + +template <class Socket> +void RpcConnectionImpl<Socket>::Disconnect() { + assert(lock_held(connection_state_lock_)); // Must be holding lock before calling + + LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called"); + + request_over_the_wire_.reset(); + if (connected_ == kConnecting || connected_ == kHandshaking || connected_ == kAuthenticating || connected_ == kConnected) { + // Don't print out errors, we were expecting a disconnect here + SafeDisconnect(get_asio_socket_ptr(&socket_)); + } + connected_ = kDisconnected; +} +} + +#endif
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 651225a..22c0e74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -16,13 +16,12 @@ * limitations under the License. */ #include "rpc_engine.h" -#include "rpc_connection.h" +#include "rpc_connection_impl.h" #include "common/util.h" #include "common/logging.h" #include "common/namenode_info.h" #include "optional.hpp" -#include <future> #include <algorithm> namespace hdfs { @@ -30,114 +29,6 @@ namespace hdfs { template <class T> using optional = std::experimental::optional<T>; -HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, - ::asio::io_service *ioservice, - std::shared_ptr<LibhdfsEvents> event_handlers) - : enabled_(false), resolved_(false), - ioservice_(ioservice), event_handlers_(event_handlers) -{ - LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes"); - for(unsigned int i=0;i<servers.size();i++) - LOG_TRACE(kRPC, << servers[i].str()); - - if(servers.size() >= 2) { - LOG_TRACE(kRPC, << "Creating HA namenode tracker"); - if(servers.size() > 2) { - LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used."); - } - - active_info_ = servers[0]; - standby_info_ = servers[1]; - LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str()); - LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str()); - - enabled_ = true; - if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) { - resolved_ = true; - } - } -} - - -HANamenodeTracker::~HANamenodeTracker() { } - - -static std::string format_endpoints(const std::vector<::asio::ip::tcp::endpoint> &pts) { - std::stringstream ss; - for(unsigned int i=0; i<pts.size(); i++) - if(i == pts.size() - 1) - ss << pts[i]; - else - ss << pts[i] << ", "; - return ss.str(); -} - -// Pass in endpoint from current connection, this will do a reverse lookup -// and return the info for the standby node. It will also swap its state internally. -ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) { - LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint); - mutex_guard swap_lock(swap_lock_); - - ResolvedNamenodeInfo failover_node; - - // Connected to standby, switch standby to active - if(IsCurrentActive_locked(current_endpoint)) { - std::swap(active_info_, standby_info_); - if(event_handlers_) - event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), - reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); - failover_node = active_info_; - } else if(IsCurrentStandby_locked(current_endpoint)) { - // Connected to standby - if(event_handlers_) - event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(), - reinterpret_cast<int64_t>(active_info_.uri.str().c_str())); - failover_node = active_info_; - } else { - // Invalid state, throw for testing - std::string ep1 = format_endpoints(active_info_.endpoints); - std::string ep2 = format_endpoints(standby_info_.endpoints); - - std::stringstream msg; - msg << "Looked for " << current_endpoint << " in\n"; - msg << ep1 << " and\n"; - msg << ep2 << std::endl; - - LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out."); - throw std::runtime_error(msg.str()); - } - - if(failover_node.endpoints.empty()) { - LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again"); - if(!ResolveInPlace(ioservice_, failover_node)) { - LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str() - << "failed. Please make sure your configuration is up to date."); - } - } - return failover_node; -} - -bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const { - for(unsigned int i=0;i<active_info_.endpoints.size();i++) { - if(ep.address() == active_info_.endpoints[i].address()) { - if(ep.port() != active_info_.endpoints[i].port()) - LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway.."); - return true; - } - } - return false; -} - -bool HANamenodeTracker::IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const { - for(unsigned int i=0;i<standby_info_.endpoints.size();i++) { - if(ep.address() == standby_info_.endpoints[i].address()) { - if(ep.port() != standby_info_.endpoints[i].port()) - LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway.."); - return true; - } - } - return false; -} RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, const std::string &client_name, const std::string &user_name, @@ -276,19 +167,6 @@ void RpcEngine::AsyncRpc( conn_->AsyncRpc(method_name, req, resp, handler); } -Status RpcEngine::Rpc( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp) { - - LOG_TRACE(kRPC, << "RpcEngine::Rpc called"); - - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - AsyncRpc(method_name, req, resp, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} - std::shared_ptr<RpcConnection> RpcEngine::NewConnection() { LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called"); @@ -304,7 +182,6 @@ std::shared_ptr<RpcConnection> RpcEngine::InitializeConnection() return result; } - void RpcEngine::AsyncRpcCommsError( const Status &status, std::shared_ptr<RpcConnection> failedConnection, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h index b4aef00..dc4054e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -25,25 +25,19 @@ #include "common/retry_policy.h" #include "common/libhdfs_events_impl.h" #include "common/util.h" -#include "common/continuation/asio.h" -#include "common/logging.h" #include "common/new_delete.h" #include "common/namenode_info.h" +#include "namenode_tracker.h" #include <google/protobuf/message_lite.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> #include <asio/ip/tcp.hpp> #include <asio/deadline_timer.hpp> #include <atomic> #include <memory> -#include <unordered_map> #include <vector> -#include <deque> #include <mutex> -#include <future> namespace hdfs { @@ -64,193 +58,8 @@ typedef const std::function<void(const Status &)> RpcCallback; class LockFreeRpcEngine; class RpcConnection; class SaslProtocol; - -/* - * Internal bookkeeping for an outstanding request from the consumer. - * - * Threading model: not thread-safe; should only be accessed from a single - * thread at a time - */ -class Request { - public: - MEMCHECKED_CLASS(Request) - typedef std::function<void(::google::protobuf::io::CodedInputStream *is, - const Status &status)> Handler; - - Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, - const std::string &request, Handler &&callback); - Request(LockFreeRpcEngine *engine, const std::string &method_name, int call_id, - const ::google::protobuf::MessageLite *request, Handler &&callback); - - // Null request (with no actual message) used to track the state of an - // initial Connect call - Request(LockFreeRpcEngine *engine, Handler &&handler); - - int call_id() const { return call_id_; } - std::string method_name() const { return method_name_; } - ::asio::deadline_timer &timer() { return timer_; } - int IncrementRetryCount() { return retry_count_++; } - int IncrementFailoverCount(); - void GetPacket(std::string *res) const; - void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, - const Status &status); - - int get_failover_count() {return failover_count_;} - - std::string GetDebugString() const; - - private: - LockFreeRpcEngine *const engine_; - const std::string method_name_; - const int call_id_; - - ::asio::deadline_timer timer_; - std::string payload_; - const Handler handler_; - - int retry_count_; - int failover_count_; -}; - -/* - * Encapsulates a persistent connection to the NameNode, and the sending of - * RPC requests and evaluating their responses. - * - * Can have multiple RPC requests in-flight simultaneously, but they are - * evaluated in-order on the server side in a blocking manner. - * - * Threading model: public interface is thread-safe - * All handlers passed in to method calls will be called from an asio thread, - * and will not be holding any internal RpcConnection locks. - */ -class RpcConnection : public std::enable_shared_from_this<RpcConnection> { - public: - MEMCHECKED_CLASS(RpcConnection) - RpcConnection(LockFreeRpcEngine *engine); - virtual ~RpcConnection(); - - // Note that a single server can have multiple endpoints - especially both - // an ipv4 and ipv6 endpoint - virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server, - const AuthInfo & auth_info, - RpcCallback &handler) = 0; - virtual void ConnectAndFlush(const std::vector<::asio::ip::tcp::endpoint> &server) = 0; - virtual void Disconnect() = 0; - - void StartReading(); - void AsyncRpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler); - - void AsyncRpc(const std::vector<std::shared_ptr<Request> > & requests); - - // Enqueue requests before the connection is connected. Will be flushed - // on connect - void PreEnqueueRequests(std::vector<std::shared_ptr<Request>> requests); - - // Put requests at the front of the current request queue - void PrependRequests_locked(std::vector<std::shared_ptr<Request>> requests); - - void SetEventHandlers(std::shared_ptr<LibhdfsEvents> event_handlers); - void SetClusterName(std::string cluster_name); - - LockFreeRpcEngine *engine() { return engine_; } - ::asio::io_service &io_service(); - - protected: - struct Response { - enum ResponseState { - kReadLength, - kReadContent, - kParseResponse, - } state_; - unsigned length_; - std::vector<char> data_; - - std::unique_ptr<::google::protobuf::io::ArrayInputStream> ar; - std::unique_ptr<::google::protobuf::io::CodedInputStream> in; - - Response() : state_(kReadLength), length_(0) {} - }; - - - // Initial handshaking protocol: connect->handshake-->(auth)?-->context->connected - virtual void SendHandshake(RpcCallback &handler) = 0; - void HandshakeComplete(const Status &s); - void AuthComplete(const Status &s, const AuthInfo & new_auth_info); - void AuthComplete_locked(const Status &s, const AuthInfo & new_auth_info); - virtual void SendContext(RpcCallback &handler) = 0; - void ContextComplete(const Status &s); - - - virtual void OnSendCompleted(const ::asio::error_code &ec, - size_t transferred) = 0; - virtual void OnRecvCompleted(const ::asio::error_code &ec, - size_t transferred) = 0; - virtual void FlushPendingRequests()=0; // Synchronously write the next request - - void AsyncRpc_locked( - const std::string &method_name, - const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const RpcCallback &handler); - void SendRpcRequests(const std::vector<std::shared_ptr<Request> > & requests); - void AsyncFlushPendingRequests(); // Queue requests to be flushed at a later time - - - - std::shared_ptr<std::string> PrepareHandshakePacket(); - std::shared_ptr<std::string> PrepareContextPacket(); - static std::string SerializeRpcRequest( - const std::string &method_name, - const ::google::protobuf::MessageLite *req); - Status HandleRpcResponse(std::shared_ptr<Response> response); - void HandleRpcTimeout(std::shared_ptr<Request> req, - const ::asio::error_code &ec); - void CommsError(const Status &status); - - void ClearAndDisconnect(const ::asio::error_code &ec); - std::shared_ptr<Request> RemoveFromRunningQueue(int call_id); - - LockFreeRpcEngine *const engine_; - std::shared_ptr<Response> current_response_state_; - AuthInfo auth_info_; - - // Connection can have deferred connection, especially when we're pausing - // during retry - enum ConnectedState { - kNotYetConnected, - kConnecting, - kHandshaking, - kAuthenticating, - kConnected, - kDisconnected - }; - static std::string ToString(ConnectedState connected); - ConnectedState connected_; - - // State machine for performing a SASL handshake - std::shared_ptr<SaslProtocol> sasl_protocol_; - // The request being sent over the wire; will also be in requests_on_fly_ - std::shared_ptr<Request> request_over_the_wire_; - // Requests to be sent over the wire - std::deque<std::shared_ptr<Request>> pending_requests_; - // Requests to be sent over the wire during authentication; not retried if - // there is a connection error - std::deque<std::shared_ptr<Request>> auth_requests_; - // Requests that are waiting for responses - typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap; - RequestOnFlyMap requests_on_fly_; - std::shared_ptr<LibhdfsEvents> event_handlers_; - std::string cluster_name_; - - // Lock for mutable parts of this class that need to be thread safe - std::mutex connection_state_lock_; - - friend class SaslProtocol; -}; - +class RpcConnection; +class Request; /* * These methods of the RpcEngine will never acquire locks, and are safe for @@ -259,6 +68,7 @@ class RpcConnection : public std::enable_shared_from_this<RpcConnection> { class LockFreeRpcEngine { public: MEMCHECKED_CLASS(LockFreeRpcEngine) + /* Enqueues a CommsError without acquiring a lock*/ virtual void AsyncRpcCommsError(const Status &status, std::shared_ptr<RpcConnection> failedConnection, @@ -279,54 +89,6 @@ public: /* - * Tracker gives the RpcEngine a quick way to use an endpoint that just - * failed in order to lookup a set of endpoints for a failover node. - * - * Note: For now this only deals with 2 NameNodes, but that's the default - * anyway. - */ -class HANamenodeTracker { - public: - HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers, - ::asio::io_service *ioservice, - std::shared_ptr<LibhdfsEvents> event_handlers_); - - virtual ~HANamenodeTracker(); - - bool is_enabled() const { return enabled_; } - bool is_resolved() const { return resolved_; } - - // Get node opposite of the current one if possible (swaps active/standby) - // Note: This will always mutate internal state. Use IsCurrentActive/Standby to - // get info without changing state - ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint); - - bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const; - bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const; - - private: - // If HA should be enabled, according to our options and runtime info like # nodes provided - bool enabled_; - // If we were able to resolve at least 1 HA namenode - bool resolved_; - - // Keep service in case a second round of DNS lookup is required - ::asio::io_service *ioservice_; - - // Event handlers, for now this is the simplest place to catch all failover events - // and push info out to client application. Possibly move into RPCEngine. - std::shared_ptr<LibhdfsEvents> event_handlers_; - - // Only support 1 active and 1 standby for now. - ResolvedNamenodeInfo active_info_; - ResolvedNamenodeInfo standby_info_; - - // Aquire when switching from active-standby - std::mutex swap_lock_; -}; - - -/* * An engine for reliable communication with a NameNode. Handles connection, * retry, and (someday) failover of the requested messages. * @@ -360,10 +122,6 @@ class RpcEngine : public LockFreeRpcEngine { const std::shared_ptr<::google::protobuf::MessageLite> &resp, const std::function<void(const Status &)> &handler); - Status Rpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp); - void Shutdown(); /* Enqueues a CommsError without acquiring a lock*/ http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc index b35a2f9..83d4f88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/sasl_protocol.cc @@ -17,6 +17,7 @@ */ #include "rpc_engine.h" +#include "rpc_connection.h" #include "common/logging.h" #include "sasl_engine.h" http://git-wip-us.apache.org/repos/asf/hadoop/blob/d75104ea/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc index 08218f6..66a5f1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc @@ -19,7 +19,7 @@ #include "mock_connection.h" #include "test.pb.h" #include "RpcHeader.pb.h" -#include "rpc/rpc_connection.h" +#include "rpc/rpc_connection_impl.h" #include "common/namenode_info.h" #include <google/protobuf/io/coded_stream.h> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org